Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of background peer tasks #3253

Merged
merged 18 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ pub use client::tests::ClientTestHarness;
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::{CancelHeartbeatTask, ClientRequest};
pub(crate) use client::ClientRequest;

use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

pub(crate) use client::CancelHeartbeatTask;

pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
Expand Down
90 changes: 70 additions & 20 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use futures::{
channel::{mpsc, oneshot},
future, ready,
stream::{Stream, StreamExt},
FutureExt,
};
use tokio::task::JoinHandle;
use tower::Service;

use crate::{
Expand Down Expand Up @@ -40,6 +42,12 @@ pub struct Client {

/// The peer connection's protocol version.
pub(crate) version: Version,

/// A handle to the task responsible for connecting to the peer.
pub(crate) connection_task: JoinHandle<()>,

/// A handle to the task responsible for sending periodic heartbeats.
pub(crate) heartbeat_task: JoinHandle<()>,
}

/// A signal sent by the [`Client`] half of a peer connection,
Expand Down Expand Up @@ -253,28 +261,70 @@ impl<T: std::fmt::Debug> Drop for MustUseOneshotSender<T> {
impl Client {
/// Check if this connection's heartbeat task has exited.
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> {
if let Poll::Ready(()) = self
let is_canceled = self
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.shutdown_tx
.as_mut()
.expect("only taken on drop")
.poll_canceled(cx)
{
// Make sure there is an error in the slot
let heartbeat_error: SharedPeerError = PeerError::HeartbeatTaskExited.into();
let original_error = self.error_slot.try_update_error(heartbeat_error.clone());
debug!(
?original_error,
latest_error = ?heartbeat_error,
"client heartbeat task exited"
);
.is_ready();

if is_canceled {
return self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited);
}

match self.heartbeat_task.poll_unpin(cx) {
Poll::Pending => {
// Heartbeat task is still running.
Ok(())
}
Poll::Ready(Ok(())) => {
// Heartbeat task stopped unexpectedly, without panicking.
self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited)
}
Poll::Ready(Err(error)) => {
// Heartbeat task stopped unexpectedly with a panic.
panic!("heartbeat task has panicked: {}", error);
}
}
}

if let Err(AlreadyErrored { original_error }) = original_error {
Err(original_error)
} else {
Err(heartbeat_error)
/// Check if the connection's task has exited.
fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> {
match self.connection_task.poll_unpin(context) {
Poll::Pending => {
// Connection task is still running.
Ok(())
}
Poll::Ready(Ok(())) => {
// Connection task stopped unexpectedly, without panicking.
self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
}
Poll::Ready(Err(error)) => {
// Connection task stopped unexpectedly with a panic.
panic!("connection task has panicked: {}", error);
}
}
}

/// Properly update the error slot after a background task has unexpectedly stopped.
fn set_task_exited_error(
&mut self,
task_name: &str,
error: PeerError,
) -> Result<(), SharedPeerError> {
// Make sure there is an error in the slot
let task_error = SharedPeerError::from(error);
let original_error = self.error_slot.try_update_error(task_error.clone());
debug!(
?original_error,
latest_error = ?task_error,
"client {} task exited", task_name
);

if let Err(AlreadyErrored { original_error }) = original_error {
Err(original_error)
} else {
Ok(())
Err(task_error)
}
}

Expand Down Expand Up @@ -318,13 +368,15 @@ impl Service<Request> for Client {
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// `poll_canceled` schedules the client task for wakeup
// if the heartbeat task exits and drops the cancel handle.
// `check_heartbeat` and `check_connection` schedule the client task for wakeup
// if either task exits, or if the heartbeat task drops the cancel handle.
//
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup.

let mut result = self.check_heartbeat(cx);
let mut result = self
.check_heartbeat(cx)
.and_then(|()| self.check_connection(cx));

if result.is_ok() {
result = ready!(self.poll_request(cx));
Expand All @@ -340,8 +392,6 @@ impl Service<Request> for Client {
}

fn call(&mut self, request: Request) -> Self::Future {
use futures::future::FutureExt;

let (tx, rx) = oneshot::channel();
// get the current Span to propagate it to the peer connection task.
// this allows the peer connection to enter the correct tracing context
Expand Down
103 changes: 99 additions & 4 deletions zebra-network/src/peer/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,41 @@

mod vectors;

use futures::channel::{mpsc, oneshot};
use std::time::Duration;

use futures::{
channel::{mpsc, oneshot},
future::{self, AbortHandle, Future, FutureExt},
};
use tokio::task::JoinHandle;

use crate::{
peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot},
protocol::external::types::Version,
};

/// The maximum time a mocked peer connection should be alive during a test.
const MAX_PEER_CONNECTION_TIME: Duration = Duration::from_secs(10);

/// A harness with mocked channels for testing a [`Client`] instance.
pub struct ClientTestHarness {
client_request_receiver: Option<mpsc::Receiver<ClientRequest>>,
shutdown_receiver: Option<oneshot::Receiver<CancelHeartbeatTask>>,
error_slot: ErrorSlot,
version: Version,
connection_aborter: AbortHandle,
heartbeat_aborter: AbortHandle,
}

impl ClientTestHarness {
/// Create a [`ClientTestHarnessBuilder`] instance to help create a new [`Client`] instance
/// and a [`ClientTestHarness`] to track it.
pub fn build() -> ClientTestHarnessBuilder {
ClientTestHarnessBuilder { version: None }
ClientTestHarnessBuilder {
version: None,
connection_task: None,
heartbeat_task: None,
}
}

/// Gets the peer protocol version associated to the [`Client`].
Expand Down Expand Up @@ -109,6 +124,22 @@ impl ClientTestHarness {
.try_update_error(error.into())
.expect("unexpected earlier error in error slot")
}

/// Stops the mock background task that handles incoming remote requests and replies.
pub async fn stop_connection_task(&self) {
self.connection_aborter.abort();

// Allow the task to detect that it was aborted.
tokio::task::yield_now().await;
}

/// Stops the mock background task that sends periodic heartbeats.
pub async fn stop_heartbeat_task(&self) {
self.heartbeat_aborter.abort();

// Allow the task to detect that it was aborted.
tokio::task::yield_now().await;
}
}

/// The result of an attempt to receive a [`ClientRequest`] sent by the [`Client`] instance.
Expand Down Expand Up @@ -152,38 +183,102 @@ impl ReceiveRequestAttempt {
/// Mocked data is used to construct a real [`Client`] instance. The mocked data is initialized by
/// the [`ClientTestHarnessBuilder`], and can be accessed and changed through the
/// [`ClientTestHarness`].
pub struct ClientTestHarnessBuilder {
pub struct ClientTestHarnessBuilder<C = future::Ready<()>, H = future::Ready<()>> {
connection_task: Option<C>,
heartbeat_task: Option<H>,
version: Option<Version>,
}

impl ClientTestHarnessBuilder {
impl<C, H> ClientTestHarnessBuilder<C, H>
where
C: Future<Output = ()> + Send + 'static,
H: Future<Output = ()> + Send + 'static,
{
/// Configure the mocked version for the peer.
pub fn with_version(mut self, version: Version) -> Self {
self.version = Some(version);
self
}

/// Configure the mock connection task future to use.
pub fn with_connection_task<NewC>(
self,
connection_task: NewC,
) -> ClientTestHarnessBuilder<NewC, H> {
ClientTestHarnessBuilder {
connection_task: Some(connection_task),
heartbeat_task: self.heartbeat_task,
version: self.version,
}
}

/// Configure the mock heartbeat task future to use.
pub fn with_heartbeat_task<NewH>(
self,
heartbeat_task: NewH,
) -> ClientTestHarnessBuilder<C, NewH> {
ClientTestHarnessBuilder {
connection_task: self.connection_task,
heartbeat_task: Some(heartbeat_task),
version: self.version,
}
}

/// Build a [`Client`] instance with the mocked data and a [`ClientTestHarness`] to track it.
pub fn finish(self) -> (Client, ClientTestHarness) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (client_request_sender, client_request_receiver) = mpsc::channel(1);
let error_slot = ErrorSlot::default();
let version = self.version.unwrap_or(Version(0));

let (connection_task, connection_aborter) =
Self::spawn_background_task_or_fallback(self.connection_task);
let (heartbeat_task, heartbeat_aborter) =
Self::spawn_background_task_or_fallback(self.heartbeat_task);

let client = Client {
shutdown_tx: Some(shutdown_sender),
server_tx: client_request_sender,
error_slot: error_slot.clone(),
version,
connection_task,
heartbeat_task,
};

let harness = ClientTestHarness {
client_request_receiver: Some(client_request_receiver),
shutdown_receiver: Some(shutdown_receiver),
error_slot,
version,
connection_aborter,
heartbeat_aborter,
};

(client, harness)
}

/// Spawn a mock background abortable task `task_future` if provided, or a fallback task
/// otherwise.
///
/// The fallback task lives as long as [`MAX_PEER_CONNECTION_TIME`].
fn spawn_background_task_or_fallback<T>(task_future: Option<T>) -> (JoinHandle<()>, AbortHandle)
where
T: Future<Output = ()> + Send + 'static,
{
match task_future {
Some(future) => Self::spawn_background_task(future),
None => Self::spawn_background_task(tokio::time::sleep(MAX_PEER_CONNECTION_TIME)),
}
}

/// Spawn a mock background abortable task to run `task_future`.
fn spawn_background_task<T>(task_future: T) -> (JoinHandle<()>, AbortHandle)
where
T: Future<Output = ()> + Send + 'static,
{
let (task, abort_handle) = future::abortable(task_future);
let task_handle = tokio::spawn(task.map(|_result| ()));

(task_handle, abort_handle)
}
}
67 changes: 67 additions & 0 deletions zebra-network/src/peer/client/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Fixed peer [`Client`] test vectors.

use futures::poll;
use tower::ServiceExt;

use zebra_test::service_extensions::IsReady;
Expand Down Expand Up @@ -150,3 +151,69 @@ async fn client_service_drop_cleanup() {
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}

/// Force the connection background task to stop, and check if the `Client` properly handles it.
#[tokio::test]
async fn client_service_handles_exited_connection_task() {
zebra_test::init();

let (mut client, mut harness) = ClientTestHarness::build().finish();

harness.stop_connection_task().await;

assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}

/// Force the connection background task to panic, and check if the `Client` propagates it.
#[tokio::test]
#[should_panic]
async fn client_service_propagates_panic_from_connection_task() {
zebra_test::init();

let (mut client, _harness) = ClientTestHarness::build()
.with_connection_task(async move {
panic!("connection task failure");
})
.finish();

// Allow the custom connection task to run.
tokio::task::yield_now().await;

let _ = poll!(client.ready());
}

/// Force the heartbeat background task to stop, and check if the `Client` properly handles it.
#[tokio::test]
async fn client_service_handles_exited_heartbeat_task() {
zebra_test::init();

let (mut client, mut harness) = ClientTestHarness::build().finish();

harness.stop_heartbeat_task().await;

assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}

/// Force the heartbeat background task to panic, and check if the `Client` propagates it.
#[tokio::test]
#[should_panic]
async fn client_service_propagates_panic_from_heartbeat_task() {
zebra_test::init();

let (mut client, _harness) = ClientTestHarness::build()
.with_heartbeat_task(async move {
panic!("heartbeat task failure");
})
.finish();

// Allow the custom heartbeat task to run.
tokio::task::yield_now().await;

let _ = poll!(client.ready());
}
Loading