Skip to content

Commit

Permalink
Merge of #6790
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored May 31, 2023
2 parents af4d531 + ca23d02 commit 1510f7d
Show file tree
Hide file tree
Showing 5 changed files with 498 additions and 38 deletions.
19 changes: 19 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND;
/// The number of nanoseconds in one second.
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;

/// The duration it takes for the drop probability of an overloaded connection to
/// reach [`MIN_OVERLOAD_DROP_PROBABILITY`].
///
/// Peer connections that receive multiple overloads have a higher probability of being dropped.
///
/// The probability of a connection being dropped gradually decreases during this interval
/// until it reaches the default drop probability ([`MIN_OVERLOAD_DROP_PROBABILITY`]).
///
/// Increasing this number increases the rate at which connections are dropped.
pub const OVERLOAD_PROTECTION_INTERVAL: Duration = MIN_INBOUND_PEER_CONNECTION_INTERVAL;

/// The minimum probability of dropping a peer connection when it receives an
/// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05;

/// The maximum probability of dropping a peer connection when it receives an
/// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95;

lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
/// represented as a network upgrade.
Expand Down
125 changes: 105 additions & 20 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc};
use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};

use futures::{
future::{self, Either},
prelude::*,
stream::Stream,
};
use rand::{thread_rng, Rng};
use tokio::time::{sleep, Sleep};
use tower::Service;
use tower::{load_shed::error::Overloaded, Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::{
Expand All @@ -25,7 +26,10 @@ use zebra_chain::{
};

use crate::{
constants,
constants::{
self, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
OVERLOAD_PROTECTION_INTERVAL,
},
meta_addr::MetaAddr,
peer::{
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
Expand Down Expand Up @@ -508,6 +512,11 @@ pub struct Connection<S, Tx> {

/// The state for this peer, when the metrics were last updated.
pub(super) last_metrics_state: Option<Cow<'static, str>>,

/// The time of the last overload error response from the inbound
/// service to a request from this connection,
/// or None if this connection hasn't yet received an overload error.
last_overload_time: Option<Instant>,
}

impl<S, Tx> fmt::Debug for Connection<S, Tx> {
Expand Down Expand Up @@ -549,6 +558,7 @@ impl<S, Tx> Connection<S, Tx> {
connection_tracker,
metrics_label,
last_metrics_state: None,
last_overload_time: None,
}
}
}
Expand Down Expand Up @@ -1242,7 +1252,6 @@ where
/// of connected peers.
async fn drive_peer_request(&mut self, req: Request) {
trace!(?req);
use tower::{load_shed::error::Overloaded, ServiceExt};

// Add a metric for inbound requests
metrics::counter!(
Expand All @@ -1258,29 +1267,18 @@ where
tokio::task::yield_now().await;

if self.svc.ready().await.is_err() {
// Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
self.fail_with(PeerError::Overloaded);
self.fail_with(PeerError::ServiceShutdown);
return;
}

let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<Overloaded>() {
tracing::info!(
remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label,
last_peer_state = ?self.last_metrics_state,
// TODO: remove this detailed debug info once #6506 is fixed
remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state,
"inbound service is overloaded, closing connection",
);
tracing::debug!("inbound service is overloaded, may close connection");

metrics::counter!("pool.closed.loadshed", 1);
self.fail_with(PeerError::Overloaded);
let now = Instant::now();

self.handle_inbound_overload(req, now).await;
} else {
// We could send a reject to the remote peer, but that might cause
// them to disconnect, and we might be using them to sync blocks.
Expand All @@ -1292,7 +1290,9 @@ where
client_receiver = ?self.client_rx,
"error processing peer request",
);
self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
}

return;
}
Ok(rsp) => rsp,
Expand All @@ -1307,6 +1307,7 @@ where
);
self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));

// TODO: split response handler into its own method
match rsp.clone() {
Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => {
Expand Down Expand Up @@ -1412,6 +1413,90 @@ where
// before checking the connection for the next inbound or outbound request.
tokio::task::yield_now().await;
}

/// Handle inbound service overload error responses by randomly terminating some connections.
///
/// # Security
///
/// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
/// to reduce the load on the application. But dropping every connection that receives an
/// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
/// connections, and stop itself downloading blocks or transactions.
///
/// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
/// its connections to other peers.
///
/// So instead, Zebra drops some overloaded connections at random. If a connection has recently
/// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
/// single peer (or multiple peers) to perform a denial of service attack.
///
/// The inbound connection rate-limit also makes it hard for multiple peers to perform this
/// attack, because each inbound connection can only send one inbound request before its
/// probability of being disconnected increases.
async fn handle_inbound_overload(&mut self, req: Request, now: Instant) {
let prev = self.last_overload_time.replace(now);
let drop_connection_probability = overload_drop_connection_probability(now, prev);

if thread_rng().gen::<f32>() < drop_connection_probability {
metrics::counter!("pool.closed.loadshed", 1);

tracing::info!(
drop_connection_probability,
remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label,
last_peer_state = ?self.last_metrics_state,
// TODO: remove this detailed debug info once #6506 is fixed
remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state,
"inbound service is overloaded, closing connection",
);

self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command()));
self.fail_with(PeerError::Overloaded);
} else {
self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command()));
metrics::counter!("pool.ignored.loadshed", 1);
}
}
}

/// Returns the probability of dropping a connection where the last overload was at `prev`,
/// and the current overload is `now`.
///
/// # Security
///
/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
///
/// Connections that have seen a previous overload error in that time
/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
/// This probability increases quadratically, so peers that send lots of inbound
/// requests are more likely to be dropped.
///
/// ## Examples
///
/// If a connection sends multiple overloads close together, it is very likely to be
/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
/// to be disconnected.
fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
let Some(prev) = prev else {
return MIN_OVERLOAD_DROP_PROBABILITY;
};

let protection_fraction_since_last_overload =
(now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();

// Quadratically increase the disconnection probability for very recent overloads.
// Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
let overload_fraction = protection_fraction_since_last_overload.powi(2);

let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
let raw_drop_probability =
MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);

raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
}

impl<S, Tx> Connection<S, Tx> {
Expand Down
Loading

0 comments on commit 1510f7d

Please sign in to comment.