diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index cb83b558396..72471b63274 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -31,10 +31,7 @@ jobs: - uses: actions-rs/toolchain@v1.0.7 with: - # Pinned to workaround issue making cargo-llvm-cov fail, see - # https://github.com/taiki-e/cargo-llvm-cov/issues/128 - # TODO: restore to just `nightly` after it's fixed - toolchain: nightly-2022-01-14 + toolchain: nightly override: true profile: minimal components: llvm-tools-preview diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 979b2283bc3..d527cbd1564 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -43,7 +43,8 @@ pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2; /// buffer adds up to 6 seconds worth of blocks to the queue. pub const PEERSET_BUFFER_SIZE: usize = 3; -/// The timeout for requests made to a remote peer. +/// The timeout for sending a message to a remote peer, +/// and receiving a response from a remote peer. pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); /// The timeout for handshakes when connecting to new peers. diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index eb1de91a902..4fb9a72d633 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -28,8 +28,9 @@ use crate::{ constants, meta_addr::MetaAddr, peer::{ - error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest, - MustUseOneshotSender, PeerError, SharedPeerError, + connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver, + ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, + SharedPeerError, }, peer_set::ConnectionTracker, protocol::{ @@ -39,6 +40,8 @@ use crate::{ BoxError, }; +mod peer_tx; + #[cfg(test)] mod tests; @@ -437,7 +440,7 @@ impl From for InboundMessage { } } -/// The state associated with a peer connection. +/// The channels, services, and associated state for a peer connection. pub struct Connection { /// The state of this connection's current request or response. pub(super) state: State, @@ -474,9 +477,7 @@ pub struct Connection { /// This channel accepts [`Message`]s. /// /// The corresponding peer message receiver is passed to [`Connection::run`]. - /// - /// TODO: add a timeout when sending messages to the remote peer (#3234) - pub(super) peer_tx: Tx, + pub(super) peer_tx: PeerTx, /// A connection tracker that reduces the open connection count when dropped. /// Used to limit the number of open connections in Zebra. @@ -498,6 +499,31 @@ pub struct Connection { pub(super) last_metrics_state: Option>, } +impl Connection { + /// Return a new connection from its channels, services, and shared state. + pub(crate) fn new( + inbound_service: S, + client_rx: futures::channel::mpsc::Receiver, + error_slot: ErrorSlot, + peer_tx: Tx, + connection_tracker: ConnectionTracker, + connected_addr: ConnectedAddr, + ) -> Self { + Connection { + state: State::AwaitingRequest, + request_timer: None, + cached_addrs: Vec::new(), + svc: inbound_service, + client_rx: client_rx.into(), + error_slot, + peer_tx: peer_tx.into(), + connection_tracker, + metrics_label: connected_addr.get_transient_addr_label(), + last_metrics_state: None, + } + } +} + impl Connection where S: Service, @@ -702,7 +728,7 @@ where } Either::Left((Either::Right(_), _peer_fut)) => { trace!(parent: &span, "client request timed out"); - let e = PeerError::ClientRequestTimeout; + let e = PeerError::ConnectionReceiveTimeout; // Replace the state with a temporary value, // so we can take ownership of the response sender. diff --git a/zebra-network/src/peer/connection/peer_tx.rs b/zebra-network/src/peer/connection/peer_tx.rs new file mode 100644 index 00000000000..7e17196d95d --- /dev/null +++ b/zebra-network/src/peer/connection/peer_tx.rs @@ -0,0 +1,37 @@ +//! The peer message sender channel. + +use futures::{Sink, SinkExt}; + +use zebra_chain::serialization::SerializationError; + +use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError}; + +/// A wrapper type for a peer connection message sender. +/// +/// Used to apply a timeout to send messages. +#[derive(Clone, Debug)] +pub struct PeerTx { + /// A channel for sending Zcash messages to the connected peer. + /// + /// This channel accepts [`Message`]s. + inner: Tx, +} + +impl PeerTx +where + Tx: Sink + Unpin, +{ + /// Sends `msg` on `self.inner`, returning a timeout error if it takes too long. + pub async fn send(&mut self, msg: Message) -> Result<(), PeerError> { + tokio::time::timeout(REQUEST_TIMEOUT, self.inner.send(msg)) + .await + .map_err(|_| PeerError::ConnectionSendTimeout)? + .map_err(Into::into) + } +} + +impl From for PeerTx { + fn from(tx: Tx) -> Self { + PeerTx { inner: tx } + } +} diff --git a/zebra-network/src/peer/connection/tests.rs b/zebra-network/src/peer/connection/tests.rs index bf8434663bc..76d128179eb 100644 --- a/zebra-network/src/peer/connection/tests.rs +++ b/zebra-network/src/peer/connection/tests.rs @@ -8,9 +8,7 @@ use zebra_chain::serialization::SerializationError; use zebra_test::mock_service::MockService; use crate::{ - peer::{ - client::ClientRequestReceiver, connection::State, ClientRequest, Connection, ErrorSlot, - }, + peer::{ClientRequest, ConnectedAddr, Connection, ErrorSlot}, peer_set::ActiveConnectionCounter, protocol::external::Message, Request, Response, @@ -23,17 +21,20 @@ mod vectors; fn new_test_connection() -> ( Connection< MockService, - SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, >, mpsc::Sender, MockService, - mpsc::UnboundedReceiver, + mpsc::Receiver, ErrorSlot, ) { let mock_inbound_service = MockService::build().finish(); - let (client_tx, client_rx) = mpsc::channel(1); + let (client_tx, client_rx) = mpsc::channel(0); let shared_error_slot = ErrorSlot::default(); - let (peer_outbound_tx, peer_outbound_rx) = mpsc::unbounded(); + + // Normally the network has more capacity than the sender's single implicit slot, + // but the smaller capacity makes some tests easier. + let (peer_tx, peer_rx) = mpsc::channel(0); let error_converter: fn(mpsc::SendError) -> SerializationError = |_| { io::Error::new( @@ -42,26 +43,22 @@ fn new_test_connection() -> ( ) .into() }; - let peer_tx = peer_outbound_tx.sink_map_err(error_converter); + let peer_tx = peer_tx.sink_map_err(error_converter); - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: mock_inbound_service.clone(), - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), + let connection = Connection::new( + mock_inbound_service.clone(), + client_rx, + shared_error_slot.clone(), peer_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + ActiveConnectionCounter::new_counter().track_connection(), + ConnectedAddr::Isolated, + ); ( connection, client_tx, mock_inbound_service, - peer_outbound_rx, + peer_rx, shared_error_slot, ) } diff --git a/zebra-network/src/peer/connection/tests/prop.rs b/zebra-network/src/peer/connection/tests/prop.rs index 5c548d26632..7290c9ad40e 100644 --- a/zebra-network/src/peer/connection/tests/prop.rs +++ b/zebra-network/src/peer/connection/tests/prop.rs @@ -41,7 +41,7 @@ proptest! { runtime.block_on(async move { // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (mut peer_tx, peer_rx) = mpsc::channel(1); let ( connection, @@ -51,7 +51,7 @@ proptest! { shared_error_slot, ) = new_test_connection(); - let connection_task = tokio::spawn(connection.run(peer_inbound_rx)); + let connection_task = tokio::spawn(connection.run(peer_rx)); let response_to_first_request = send_block_request( first_block.hash(), @@ -71,13 +71,13 @@ proptest! { .await; // Reply to first request - peer_inbound_tx + peer_tx .send(Ok(Message::Block(first_block))) .await .expect("Failed to send response to first block request"); // Reply to second request - peer_inbound_tx + peer_tx .send(Ok(Message::Block(second_block.clone()))) .await .expect("Failed to send response to second block request"); @@ -100,7 +100,7 @@ proptest! { inbound_service.expect_no_requests().await?; // Stop the connection thread - mem::drop(peer_inbound_tx); + mem::drop(peer_tx); let connection_task_result = connection_task.await; prop_assert!(connection_task_result.is_ok()); @@ -114,11 +114,11 @@ proptest! { fn new_test_connection() -> ( Connection< MockService, - SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, >, mpsc::Sender, MockService, - mpsc::UnboundedReceiver, + mpsc::Receiver, ErrorSlot, ) { super::new_test_connection() @@ -127,7 +127,7 @@ fn new_test_connection() -> ( async fn send_block_request( block: block::Hash, client_requests: &mut mpsc::Sender, - outbound_messages: &mut mpsc::UnboundedReceiver, + outbound_messages: &mut mpsc::Receiver, ) -> oneshot::Receiver> { let (response_sender, response_receiver) = oneshot::channel(); diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs index d4ba0060e80..b9bce22768f 100644 --- a/zebra-network/src/peer/connection/tests/vectors.rs +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -1,15 +1,23 @@ //! Fixed test vectors for peer connections. //! -//! TODO: -//! - connection tests when awaiting requests (#3232) -//! - connection tests with closed/dropped peer_outbound_tx (#3233) +//! TODO: add tests for: +//! - inbound message as request +//! - inbound message, but not a request (or a response) -use futures::{channel::mpsc, sink::SinkMapErr, FutureExt, StreamExt}; +use std::{collections::HashSet, task::Poll, time::Duration}; +use futures::{ + channel::{mpsc, oneshot}, + sink::SinkMapErr, + FutureExt, StreamExt, +}; + +use tracing::Span; use zebra_chain::serialization::SerializationError; use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ + constants::REQUEST_TIMEOUT, peer::{ connection::{Connection, State}, ClientRequest, ErrorSlot, @@ -18,18 +26,19 @@ use crate::{ PeerError, Request, Response, }; +/// Test that the connection run loop works as a future #[tokio::test] async fn connection_run_loop_ok() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // The run loop will wait forever for a request from Zebra or the peer, // without any errors, channel closes, or bytes written. @@ -41,57 +50,180 @@ async fn connection_run_loop_ok() { assert_eq!(result, None); let error = shared_error_slot.try_get_error(); - assert!( - matches!(error, None), - "unexpected connection error: {:?}", - error - ); + assert!(error.is_none(), "unexpected error: {:?}", error); assert!(!client_tx.is_closed()); - assert!(!peer_inbound_tx.is_closed()); + assert!(!peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); +} + +/// Test that the connection run loop works as a spawned task +#[tokio::test] +async fn connection_run_loop_spawn_ok() { + zebra_test::init(); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (peer_tx, peer_rx) = mpsc::channel(1); + + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); + + // Spawn the connection run loop + let mut connection_join_handle = tokio::spawn(connection.run(peer_rx)); + + let error = shared_error_slot.try_get_error(); + assert!(error.is_none(), "unexpected error: {:?}", error); + + assert!(!client_tx.is_closed()); + assert!(!peer_tx.is_closed()); inbound_service.expect_no_requests().await; + + // Make sure that the connection did not: + // - panic, or + // - return. + // + // This test doesn't cause any fatal errors, + // so returning would be incorrect behaviour. + let connection_result = futures::poll!(&mut connection_join_handle); + assert!( + matches!(connection_result, Poll::Pending), + "unexpected run loop termination: {:?}", + connection_result, + ); + + // We need to abort the connection, because it holds a lock on the outbound channel. + connection_join_handle.abort(); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop works as a spawned task with messages in and out +#[tokio::test] +async fn connection_run_loop_message_ok() { + zebra_test::init(); + + tokio::time::pause(); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (mut peer_tx, peer_rx) = mpsc::channel(1); + + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); + + // Spawn the connection run loop + let mut connection_join_handle = tokio::spawn(connection.run(peer_rx)); + + // Simulate a message send and receive + let (request_tx, mut request_rx) = oneshot::channel(); + let request = ClientRequest { + request: Request::Peers, + tx: request_tx, + span: Span::current(), + }; + + client_tx + .try_send(request) + .expect("internal request channel is valid"); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, Some(Message::GetAddr)); + + peer_tx + .try_send(Ok(Message::Addr(Vec::new()))) + .expect("peer inbound response channel is valid"); + + // give the event loop time to run + tokio::task::yield_now().await; + let peer_response = request_rx.try_recv(); + assert_eq!( + peer_response + .expect("peer internal response channel is valid") + .expect("response is present") + .expect("response is a message (not an error)"), + Response::Peers(Vec::new()), + ); + + let error = shared_error_slot.try_get_error(); + assert!(error.is_none(), "unexpected error: {:?}", error); + + assert!(!client_tx.is_closed()); + assert!(!peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; + + // Make sure that the connection did not: + // - panic, or + // - return. + // + // This test doesn't cause any fatal errors, + // so returning would be incorrect behaviour. + let connection_result = futures::poll!(&mut connection_join_handle); + assert!( + matches!(connection_result, Poll::Pending), + "unexpected run loop termination: {:?}", + connection_result, + ); + + // We need to abort the connection, because it holds a lock on the outbound channel. + connection_join_handle.abort(); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); +} + +/// Test that the connection run loop fails correctly when dropped #[tokio::test] async fn connection_run_loop_future_drop() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // now_or_never implicitly drops the connection future. let result = connection.now_or_never(); assert_eq!(result, None); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ConnectionDropped", + ); assert!(client_tx.is_closed()); - assert!(peer_inbound_tx.is_closed()); - - assert!(peer_outbound_messages.next().await.is_none()); + assert!(peer_tx.is_closed()); inbound_service.expect_no_requests().await; + + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop fails correctly when the internal client closes the connection channel #[tokio::test] async fn connection_run_loop_client_close() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let ( connection, @@ -101,7 +233,7 @@ async fn connection_run_loop_client_close() { shared_error_slot, ) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // Explicitly close the client channel. client_tx.close_channel(); @@ -113,30 +245,35 @@ async fn connection_run_loop_client_close() { assert_eq!(result, Some(())); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ClientDropped", + ); assert!(client_tx.is_closed()); - assert!(peer_inbound_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); - - inbound_service.expect_no_requests().await; + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop fails correctly when the internal client drops the connection channel #[tokio::test] async fn connection_run_loop_client_drop() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // Drop the client channel. std::mem::drop(client_tx); @@ -148,32 +285,38 @@ async fn connection_run_loop_client_drop() { assert_eq!(result, Some(())); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ClientDropped", + ); - assert!(peer_inbound_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); - - inbound_service.expect_no_requests().await; + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop fails correctly when the peer channel is closed. +/// (We're not sure if tokio closes or drops the TcpStream when the TCP connection closes.) #[tokio::test] async fn connection_run_loop_inbound_close() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (mut peer_tx, peer_rx) = mpsc::channel(1); let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // Explicitly close the inbound peer channel. - peer_inbound_tx.close_channel(); + peer_tx.close_channel(); // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. let connection = connection.shared(); @@ -182,33 +325,39 @@ async fn connection_run_loop_inbound_close() { assert_eq!(result, Some(())); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ConnectionClosed", + ); assert!(client_tx.is_closed()); - assert!(peer_inbound_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); - - inbound_service.expect_no_requests().await; + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop fails correctly when the peer channel is dropped +/// (We're not sure if tokio closes or drops the TcpStream when the TCP connection closes.) #[tokio::test] async fn connection_run_loop_inbound_drop() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = new_test_connection(); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // Drop the inbound peer channel. - std::mem::drop(peer_inbound_tx); + std::mem::drop(peer_tx); // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. let connection = connection.shared(); @@ -217,24 +366,29 @@ async fn connection_run_loop_inbound_drop() { assert_eq!(result, Some(())); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ConnectionClosed", + ); assert!(client_tx.is_closed()); + inbound_service.expect_no_requests().await; + // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); - - inbound_service.expect_no_requests().await; + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } +/// Test that the connection run loop fails correctly on internal connection errors. #[tokio::test] async fn connection_run_loop_failed() { zebra_test::init(); // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. - let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + let (peer_tx, peer_rx) = mpsc::channel(1); let ( mut connection, @@ -247,10 +401,10 @@ async fn connection_run_loop_failed() { // Simulate an internal connection error. connection.state = State::Failed; shared_error_slot - .try_update_error(PeerError::ClientRequestTimeout.into()) + .try_update_error(PeerError::Overloaded.into()) .expect("unexpected previous error in tests"); - let connection = connection.run(peer_inbound_rx); + let connection = connection.run(peer_rx); // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. let connection = connection.shared(); @@ -261,27 +415,253 @@ async fn connection_run_loop_failed() { assert_eq!(result, Some(())); let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "Overloaded", + ); assert!(client_tx.is_closed()); - assert!(peer_inbound_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert!(peer_outbound_messages.next().await.is_none()); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); +} + +/// Test that the connection run loop fails correctly when sending a message to a peer times out, +/// but we are not expecting a response message from the peer. +#[tokio::test] +async fn connection_run_loop_send_timeout_nil_response() { + zebra_test::init(); + + tokio::time::pause(); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (peer_tx, peer_rx) = mpsc::channel(1); + + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); + + // Spawn the connection run loop + let mut connection_join_handle = tokio::spawn(connection.run(peer_rx)); + + // Simulate a message send timeout + let (request_tx, mut request_rx) = oneshot::channel(); + let request = ClientRequest { + request: Request::AdvertiseTransactionIds(HashSet::new()), + tx: request_tx, + span: Span::current(), + }; + + client_tx.try_send(request).expect("channel is valid"); + + // Make the send timeout + tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await; + + // Send timeouts close the connection + let error = shared_error_slot.try_get_error(); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ConnectionSendTimeout", + ); + + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, Some(Message::Inv(Vec::new()))); + + let peer_response = request_rx.try_recv(); + assert_eq!( + peer_response + .expect("peer internal response channel is valid") + .expect("response is present") + .expect_err("response is an error (not a message)") + .inner_debug(), + "ConnectionSendTimeout", + ); + + assert!(client_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; + + // Make sure that the connection finished, but did not panic. + let connection_result = futures::poll!(&mut connection_join_handle); + assert!( + matches!(connection_result, Poll::Ready(Ok(()))), + "expected run loop termination, but run loop continued: {:?}", + connection_result, + ); + + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); +} + +/// Test that the connection run loop fails correctly when sending a message to a peer times out, +/// and we are expecting a response message from the peer. +#[tokio::test] +async fn connection_run_loop_send_timeout_expect_response() { + zebra_test::init(); + + tokio::time::pause(); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (peer_tx, peer_rx) = mpsc::channel(1); + + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); + + // Spawn the connection run loop + let mut connection_join_handle = tokio::spawn(connection.run(peer_rx)); + + // Simulate a message send timeout + let (request_tx, mut request_rx) = oneshot::channel(); + let request = ClientRequest { + request: Request::Peers, + tx: request_tx, + span: Span::current(), + }; + + client_tx.try_send(request).expect("channel is valid"); + + // Make the send timeout + tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await; + + // Send timeouts close the connection + let error = shared_error_slot.try_get_error(); + assert_eq!( + error.expect("missing expected error").inner_debug(), + "ConnectionSendTimeout", + ); + + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, Some(Message::GetAddr)); + + let peer_response = request_rx.try_recv(); + assert_eq!( + peer_response + .expect("peer internal response channel is valid") + .expect("response is present") + .expect_err("response is an error (not a message)") + .inner_debug(), + "ConnectionSendTimeout", + ); + + assert!(client_tx.is_closed()); + assert!(peer_tx.is_closed()); + + inbound_service.expect_no_requests().await; + + // Make sure that the connection finished, but did not panic. + let connection_result = futures::poll!(&mut connection_join_handle); + assert!( + matches!(connection_result, Poll::Ready(Ok(()))), + "expected run loop termination, but run loop continued: {:?}", + connection_result, + ); + + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); +} + +/// Test that the connection run loop continues but returns an error to the client, +/// when a peer accepts a message, but does not send an expected response. +#[tokio::test] +async fn connection_run_loop_receive_timeout() { + zebra_test::init(); + + tokio::time::pause(); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (peer_tx, peer_rx) = mpsc::channel(1); + + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); + + // Spawn the connection run loop + let mut connection_join_handle = tokio::spawn(connection.run(peer_rx)); + + // Simulate a message receive timeout + let (request_tx, mut request_rx) = oneshot::channel(); + let request = ClientRequest { + request: Request::Peers, + tx: request_tx, + span: Span::current(), + }; + + client_tx.try_send(request).expect("channel is valid"); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, Some(Message::GetAddr)); + + // Make the receive timeout + tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await; + + // Receive timeouts don't close the connection + let error = shared_error_slot.try_get_error(); + assert!(error.is_none(), "unexpected error: {:?}", error); + + assert!(!client_tx.is_closed()); + assert!(!peer_tx.is_closed()); + + let peer_response = request_rx.try_recv(); + assert_eq!( + peer_response + .expect("peer internal response channel is valid") + .expect("response is present") + .expect_err("response is an error (not a message)") + .inner_debug(), + "ConnectionReceiveTimeout", + ); inbound_service.expect_no_requests().await; + + // Make sure that the connection did not: + // - panic, or + // - return. + // + // This test doesn't cause any fatal errors, + // so returning would be incorrect behaviour. + let connection_result = futures::poll!(&mut connection_join_handle); + assert!( + matches!(connection_result, Poll::Pending), + "unexpected run loop termination: {:?}", + connection_result, + ); + + // We need to abort the connection, because it holds a lock on the outbound channel. + connection_join_handle.abort(); + let outbound_message = peer_outbound_messages.next().await; + assert_eq!(outbound_message, None); } /// Creates a new [`Connection`] instance for unit tests. fn new_test_connection() -> ( Connection< MockService, - SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, >, mpsc::Sender, MockService, - mpsc::UnboundedReceiver, + mpsc::Receiver, ErrorSlot, ) { super::new_test_connection() diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index fdcf6bf1a62..2122bf21c7d 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -21,6 +21,15 @@ where } } +impl SharedPeerError { + /// Returns a debug-formatted string describing the inner [`PeerError`]. + /// + /// Unfortunately, [`TracedError`] makes it impossible to get a reference to the original error. + pub fn inner_debug(&self) -> String { + format!("{:?}", self.0.as_ref()) + } +} + /// An error related to peer connection handling. #[derive(Error, Debug)] #[allow(dead_code)] @@ -49,9 +58,13 @@ pub enum PeerError { #[error("Internal heartbeat task exited")] HeartbeatTaskExited, - /// The remote peer did not respond to a [`peer::Client`] request in time. - #[error("Client request timed out")] - ClientRequestTimeout, + /// Sending a message to a remote peer took too long. + #[error("Sending Client request timed out")] + ConnectionSendTimeout, + + /// Receiving a response to a [`peer::Client`] request took too long. + #[error("Receiving client response timed out")] + ConnectionReceiveTimeout, /// A serialization error occurred while reading or writing a message. #[error("Serialization error: {0}")] @@ -82,7 +95,8 @@ impl PeerError { PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(), PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(), PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(), - PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(), + PeerError::ConnectionSendTimeout => "ConnectionSendTimeout".into(), + PeerError::ConnectionReceiveTimeout => "ConnectionReceiveTimeout".into(), // TODO: add error kinds or summaries to `SerializationError` PeerError::Serialization(inner) => format!("Serialization({})", inner).into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 47cbfaaacf2..fa8244c676b 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -958,19 +958,14 @@ where }) .boxed(); - use super::connection; - let server = Connection { - state: connection::State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: inbound_service, - client_rx: server_rx.into(), - error_slot: error_slot.clone(), + let server = Connection::new( + inbound_service, + server_rx, + error_slot.clone(), peer_tx, connection_tracker, - metrics_label: connected_addr.get_transient_addr_label(), - last_metrics_state: None, - }; + connected_addr, + ); let connection_task = tokio::spawn( server