Skip to content

Commit

Permalink
fix(panic): Stop panicking when handling inbound connection handshakes (
Browse files Browse the repository at this point in the history
#6984)

* Remove a redundant outbound connector timeout

* Fix panics in inbound connection handshaker

* Refactor to simplify FuturesUnordered types
  • Loading branch information
teor2345 authored Jun 19, 2023
1 parent 73ce8fb commit 859353b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 24 deletions.
9 changes: 6 additions & 3 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use std::{
};

use futures::prelude::*;
use tokio::{net::TcpStream, time::timeout};
use tokio::net::TcpStream;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};

use crate::{
constants::HANDSHAKE_TIMEOUT,
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
peer_set::ConnectionTracker,
BoxError, PeerSocketAddr, Request, Response,
Expand Down Expand Up @@ -93,8 +92,12 @@ where
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);

// # Security
//
// `zebra_network::init()` implements a connection timeout on this future.
// Any code outside this future does not have a timeout.
async move {
let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??;
let tcp_stream = TcpStream::connect(*addr).await?;
let client = hs
.oneshot(HandshakeRequest::<TcpStream> {
data_stream: tcp_stream,
Expand Down
4 changes: 4 additions & 0 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ where
let relay = self.relay;
let minimum_peer_version = self.minimum_peer_version.clone();

// # Security
//
// `zebra_network::init()` implements a connection timeout on this future.
// Any code outside this future does not have a timeout.
let fut = async move {
debug!(
addr = ?connected_addr,
Expand Down
76 changes: 55 additions & 21 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
collections::{BTreeMap, HashSet},
convert::Infallible,
net::SocketAddr,
pin::Pin,
sync::Arc,
time::Duration,
};
Expand All @@ -15,13 +16,14 @@ use futures::{
future::{self, FutureExt},
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
Future, TryFutureExt,
};
use rand::seq::SliceRandom;
use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
time::{sleep, Instant},
task::JoinError,
time::{error::Elapsed, sleep, Instant},
};
use tokio_stream::wrappers::IntervalStream;
use tower::{
Expand Down Expand Up @@ -565,7 +567,8 @@ where
"Inbound Connections",
);

let mut handshakes = FuturesUnordered::new();
let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
FuturesUnordered::new();
// Keeping an unresolved future in the pool means the stream never terminates.
handshakes.push(future::pending().boxed());

Expand All @@ -575,8 +578,7 @@ where
biased;
next_handshake_res = handshakes.next() => match next_handshake_res {
// The task has already sent the peer change to the peer set.
Some(Ok(_)) => continue,
Some(Err(task_panic)) => panic!("panic in inbound handshake task: {task_panic:?}"),
Some(()) => continue,
None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
},

Expand Down Expand Up @@ -611,19 +613,37 @@ where
connection_tracker,
peerset_tx.clone(),
)
.await?;
.await?
.map(move |res| match res {
Ok(()) => (),
Err(e @ JoinError { .. }) => {
if e.is_panic() {
panic!("panic during inbound handshaking: {e:?}");
} else {
info!(
"task error during inbound handshaking: {e:?}, is Zebra shutting down?"
)
}
}
});

let handshake_timeout = tokio::time::timeout(
// Only trigger this timeout if the inner handshake timeout fails
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.map(move |res| match res {
Ok(()) => (),
Err(_e @ Elapsed { .. }) => {
info!(
"timeout in spawned accept_inbound_handshake() task: \
inner task should have timeout out already"
);
}
});

// This timeout helps locate inbound peer connection hangs, see #6763 for details.
handshakes.push(Box::pin(
tokio::time::timeout(
// Only trigger this timeout if the inner handshake timeout fails
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.inspect_err(|_elapsed| {
info!("timeout in spawned accept_inbound_handshake() task")
}),
));
handshakes.push(Box::pin(handshake_timeout));

// Rate-limit inbound connection handshakes.
// But sleep longer after a successful connection,
Expand Down Expand Up @@ -798,7 +818,9 @@ where
let candidates = Arc::new(futures::lock::Mutex::new(candidates));

// This contains both crawl and handshake tasks.
let mut handshakes = FuturesUnordered::new();
let mut handshakes: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
> = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream never terminates.
handshakes.push(future::pending().boxed());
Expand Down Expand Up @@ -905,8 +927,14 @@ where
})
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking: {e:?}");
Err(e @ JoinError {..}) => {
if e.is_panic() {
panic!("panic during outbound handshake: {e:?}");
} else {
info!("task error during outbound handshake: {e:?}, is Zebra shutting down?")
}
// Just fake it
Ok(HandshakeFinished)
}
})
.in_current_span();
Expand All @@ -929,8 +957,14 @@ where
})
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during TimerCrawl: {tick:?} {e:?}");
Err(e @ JoinError {..}) => {
if e.is_panic() {
panic!("panic during outbound TimerCrawl: {tick:?} {e:?}");
} else {
info!("task error during outbound TimerCrawl: {e:?}, is Zebra shutting down?")
}
// Just fake it
Ok(TimerCrawlFinished)
}
})
.in_current_span();
Expand Down

0 comments on commit 859353b

Please sign in to comment.