Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(net): Try harder to drop connections when they shut down, Credit: Ziggurat Team #6832

Merged
merged 5 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5852,6 +5852,7 @@ dependencies = [
"howudoin",
"humantime-serde",
"indexmap",
"itertools",
"lazy_static",
"metrics 0.21.0",
"ordered-map",
Expand Down
1 change: 1 addition & 0 deletions zebra-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ chrono = { version = "0.4.25", default-features = false, features = ["clock", "s
hex = "0.4.3"
humantime-serde = "1.1.1"
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = "0.10.5"
lazy_static = "1.4.0"
ordered-map = "0.4.2"
pin-project = "1.1.0"
Expand Down
3 changes: 3 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05;
/// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95;

/// The minimum interval between logging peer set status updates.
pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60);

lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
/// represented as a network upgrade.
Expand Down
6 changes: 5 additions & 1 deletion zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,14 @@ impl Client {
// Prevent any senders from sending more messages to this peer.
self.server_tx.close_channel();

// Stop the heartbeat task
// Ask the heartbeat task to stop.
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(CancelHeartbeatTask);
}

// Force the connection and heartbeat tasks to stop.
self.connection_task.abort();
self.heartbeat_task.abort();
}
}

Expand Down
113 changes: 75 additions & 38 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ impl From<Request> for InboundMessage {
}

/// The channels, services, and associated state for a peer connection.
pub struct Connection<S, Tx> {
pub struct Connection<S, Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// The metadata for the connected peer `service`.
///
/// This field is used for debugging.
Expand Down Expand Up @@ -519,7 +522,10 @@ pub struct Connection<S, Tx> {
last_overload_time: Option<Instant>,
}

impl<S, Tx> fmt::Debug for Connection<S, Tx> {
impl<S, Tx> fmt::Debug for Connection<S, Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// skip the channels, they don't tell us anything useful
f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
Expand All @@ -534,7 +540,10 @@ impl<S, Tx> fmt::Debug for Connection<S, Tx> {
}
}

impl<S, Tx> Connection<S, Tx> {
impl<S, Tx> Connection<S, Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// Return a new connection from its channels, services, and shared state.
pub(crate) fn new(
inbound_service: S,
Expand Down Expand Up @@ -645,9 +654,9 @@ where
// the request completes (or times out).
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
self.fail_with(PeerError::ConnectionClosed).await;
}
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Err(e)), _)) => self.fail_with(e).await,
Either::Left((Some(Ok(msg)), _)) => {
let unhandled_msg = self.handle_message_as_request(msg).await;

Expand All @@ -663,7 +672,8 @@ where

// There are no requests to be flushed,
// but we need to set an error and update metrics.
self.shutdown(PeerError::ClientDropped);
// (We don't want to log this error, because it's normal behaviour.)
self.shutdown_async(PeerError::ClientDropped).await;
break;
}
Either::Right((Some(req), _)) => {
Expand Down Expand Up @@ -753,8 +763,10 @@ where
.instrument(span.clone())
.await
{
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((None, _)) => {
self.fail_with(PeerError::ConnectionClosed).await
}
Either::Right((Some(Err(e)), _)) => self.fail_with(e).await,
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));

Expand Down Expand Up @@ -813,7 +825,7 @@ where
// So we do the state request cleanup manually.
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
self.fail_with(e).await;
State::Failed
}
// Other request timeouts fail the request.
Expand All @@ -840,6 +852,8 @@ where
}
}

// TODO: close peer_rx here, after changing it from a stream to a channel

let error = self.error_slot.try_get_error();
assert!(
error.is_some(),
Expand All @@ -849,18 +863,21 @@ where
self.update_state_metrics(error.expect("checked is_some").to_string());
}

/// Fail this connection.
/// Fail this connection, log the failure, and shut it down.
/// See [`Self::shutdown_async()`] for details.
///
/// If the connection has errored already, re-use the original error.
/// Otherwise, fail the connection with `error`.
fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
/// Use [`Self::shutdown_async()`] to avoid logging the failure,
/// and [`Self::shutdown()`] from non-async code.
async fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
let error = error.into();

debug!(%error,
client_receiver = ?self.client_rx,
"failing peer service with error");
debug!(
%error,
client_receiver = ?self.client_rx,
"failing peer service with error"
);

self.shutdown(error);
self.shutdown_async(error).await;
}

/// Handle an internal client request, possibly generating outgoing messages to the
Expand Down Expand Up @@ -1052,7 +1069,7 @@ where
Err(error) => {
let error = SharedPeerError::from(error);
let _ = tx.send(Err(error.clone()));
self.fail_with(error);
self.fail_with(error).await;
}
};
}
Expand All @@ -1075,17 +1092,17 @@ where
Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat");
if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
self.fail_with(e);
self.fail_with(e).await;
}
Consumed
}
// These messages shouldn't be sent outside of a handshake.
Message::Version { .. } => {
self.fail_with(PeerError::DuplicateHandshake);
self.fail_with(PeerError::DuplicateHandshake).await;
Consumed
}
Message::Verack { .. } => {
self.fail_with(PeerError::DuplicateHandshake);
self.fail_with(PeerError::DuplicateHandshake).await;
Consumed
}
// These messages should already be handled as a response if they
Expand Down Expand Up @@ -1267,7 +1284,7 @@ where
tokio::task::yield_now().await;

if self.svc.ready().await.is_err() {
self.fail_with(PeerError::ServiceShutdown);
self.fail_with(PeerError::ServiceShutdown).await;
return;
}

Expand Down Expand Up @@ -1312,7 +1329,7 @@ where
Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => {
if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
self.fail_with(e);
self.fail_with(e).await;
}
}
Response::Transactions(transactions) => {
Expand All @@ -1324,7 +1341,7 @@ where
match transaction {
Available(transaction) => {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e);
self.fail_with(e).await;
return;
}
}
Expand All @@ -1334,7 +1351,7 @@ where

if !missing_ids.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
self.fail_with(e);
self.fail_with(e).await;
return;
}
}
Expand All @@ -1348,7 +1365,7 @@ where
match block {
Available(block) => {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e);
self.fail_with(e).await;
return;
}
}
Expand All @@ -1358,7 +1375,7 @@ where

if !missing_hashes.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
self.fail_with(e);
self.fail_with(e).await;
return;
}
}
Expand All @@ -1369,12 +1386,12 @@ where
.send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await
{
self.fail_with(e)
self.fail_with(e).await
}
}
Response::BlockHeaders(headers) => {
if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
self.fail_with(e)
self.fail_with(e).await
}
}
Response::TransactionIds(hashes) => {
Expand Down Expand Up @@ -1402,7 +1419,7 @@ where
.collect();

if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
self.fail_with(e)
self.fail_with(e).await
}
}
}
Expand Down Expand Up @@ -1454,7 +1471,7 @@ where
);

self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command()));
self.fail_with(PeerError::Overloaded);
self.fail_with(PeerError::Overloaded).await;
} else {
self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command()));
metrics::counter!("pool.ignored.loadshed", 1);
Expand Down Expand Up @@ -1499,7 +1516,10 @@ fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) ->
raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
}

impl<S, Tx> Connection<S, Tx> {
impl<S, Tx> Connection<S, Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// Update the connection state metrics for this connection,
/// using `extra_state_info` as additional state information.
fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
Expand Down Expand Up @@ -1538,18 +1558,32 @@ impl<S, Tx> Connection<S, Tx> {
}
}

/// Marks the peer as having failed with `error`, and performs connection cleanup.
/// Marks the peer as having failed with `error`, and performs connection cleanup,
/// including async channel closes.
///
/// If the connection has errored already, re-use the original error.
/// Otherwise, fail the connection with `error`.
async fn shutdown_async(&mut self, error: impl Into<SharedPeerError>) {
// Close async channels first, so other tasks can start shutting down.
// There's nothing we can do about errors while shutting down, and some errors are expected.
//
// TODO: close peer_tx and peer_rx in shutdown() and Drop, after:
// - using channels instead of streams/sinks?
// - exposing the underlying implementation rather than using generics and closures?
// - adding peer_rx to the connection struct (optional)
let _ = self.peer_tx.close().await;

self.shutdown(error);
}

/// Marks the peer as having failed with `error`, and performs connection cleanup.
/// See [`Self::shutdown_async()`] for details.
///
/// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels.
fn shutdown(&mut self, error: impl Into<SharedPeerError>) {
let mut error = error.into();

// Close channels first, so other tasks can start shutting down.
//
// TODO: close peer_tx and peer_rx, after:
// - adapting them using a struct with a Stream impl, rather than closures
// - making the struct forward `close` to the inner channel
self.client_rx.close();

// Update the shared error slot
Expand Down Expand Up @@ -1617,7 +1651,10 @@ impl<S, Tx> Connection<S, Tx> {
}
}

impl<S, Tx> Drop for Connection<S, Tx> {
impl<S, Tx> Drop for Connection<S, Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
fn drop(&mut self) {
self.shutdown(PeerError::ConnectionDropped);

Expand Down
27 changes: 24 additions & 3 deletions zebra-network/src/peer/connection/peer_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The peer message sender channel.

use futures::{Sink, SinkExt};
use futures::{FutureExt, Sink, SinkExt};

use zebra_chain::serialization::SerializationError;

Expand All @@ -10,7 +10,10 @@ use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError};
///
/// Used to apply a timeout to send messages.
#[derive(Clone, Debug)]
pub struct PeerTx<Tx> {
pub struct PeerTx<Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// A channel for sending Zcash messages to the connected peer.
///
/// This channel accepts [`Message`]s.
Expand All @@ -28,10 +31,28 @@ where
.map_err(|_| PeerError::ConnectionSendTimeout)?
.map_err(Into::into)
}

/// Flush any remaining output and close this [`PeerTx`], if necessary.
pub async fn close(&mut self) -> Result<(), SerializationError> {
self.inner.close().await
}
}

impl<Tx> From<Tx> for PeerTx<Tx> {
impl<Tx> From<Tx> for PeerTx<Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
fn from(tx: Tx) -> Self {
PeerTx { inner: tx }
}
}

impl<Tx> Drop for PeerTx<Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
fn drop(&mut self) {
// Do a last-ditch close attempt on the sink
self.close().now_or_never();
}
}
Loading