diff --git a/Cargo.lock b/Cargo.lock index 285a2c844..45436947e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2173,6 +2173,7 @@ dependencies = [ name = "network" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "backoff", "bincode", diff --git a/network/Cargo.toml b/network/Cargo.toml index 0d69ed9af..dad8a0b7f 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,6 +20,7 @@ crypto = { path = "../crypto" } tonic = { version = "0.7.2", features = ["tls"] } backoff = { version = "0.4.0", features = ["tokio"] } multiaddr = "0.14.0" +anyhow = "1.0.58" mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "123c9e40b529315e1c1d91a54fb717111c3e349c" } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/network/src/bounded_executor.rs b/network/src/bounded_executor.rs index 89a30910f..7fa899e2b 100644 --- a/network/src/bounded_executor.rs +++ b/network/src/bounded_executor.rs @@ -8,7 +8,7 @@ //! concurrently when spawned through this executor, defined by the initial //! `capacity`. -use futures::future::{Future, FutureExt}; +use futures::{future::Future, FutureExt}; use std::sync::Arc; use tokio::{ runtime::Handle, @@ -18,6 +18,31 @@ use tokio::{ use tracing::info; +use thiserror::Error; + +#[derive(Error)] +pub enum BoundedExecutionError +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[error("Concurrent execution limit reached")] + Full(F), +} + +impl std::fmt::Debug for BoundedExecutionError +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + // Elide the future to let this be unwrapped + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Full(_f) => f.debug_tuple("Full").finish(), + } + } +} + #[derive(Clone, Debug)] pub struct BoundedExecutor { semaphore: Arc, @@ -35,6 +60,18 @@ impl BoundedExecutor { } } + // Acquires a permit with the semaphore, first gracefully, + // then queuing after logging that we're out of capacity. + async fn acquire_permit(semaphore: Arc) -> OwnedSemaphorePermit { + match semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => { + info!("concurrent task limit reached, waiting..."); + semaphore.acquire_owned().await.unwrap() + } + } + } + /// Spawn a [`Future`] on the `BoundedExecutor`. This function is async and /// will block if the executor is at capacity until one of the other spawned /// futures completes. This function returns a [`JoinHandle`] that the caller @@ -44,13 +81,7 @@ impl BoundedExecutor { F: Future + Send + 'static, F::Output: Send + 'static, { - let permit = match self.semaphore.clone().try_acquire_owned() { - Ok(p) => p, - Err(_) => { - info!("concurrent task limit reached, waiting..."); - self.semaphore.clone().acquire_owned().await.unwrap() - } - }; + let permit = Self::acquire_permit(self.semaphore.clone()).await; self.spawn_with_permit(f, permit) } @@ -60,14 +91,14 @@ impl BoundedExecutor { /// caller attempted to spawn. Otherwise, this will spawn the future on the /// executor and send back a [`JoinHandle`] that the caller can `.await` on /// for the results of the [`Future`]. - pub fn try_spawn(&self, f: F) -> Result, F> + pub fn try_spawn(&self, f: F) -> Result, BoundedExecutionError> where F: Future + Send + 'static, F::Output: Send + 'static, { - match self.semaphore.clone().try_acquire_owned().ok() { - Some(permit) => Ok(self.spawn_with_permit(f, permit)), - None => Err(f), + match self.semaphore.clone().try_acquire_owned() { + Ok(permit) => Ok(self.spawn_with_permit(f, permit)), + Err(_) => Err(BoundedExecutionError::Full(f)), } } @@ -81,24 +112,115 @@ impl BoundedExecutor { F::Output: Send + 'static, { // Release the permit back to the semaphore when this task completes. - let f = f.map(move |ret| { + let f = Self::with_permit(f, spawn_permit); + self.executor.spawn(f) + } + + // Returns a [`Future`] that complies with the `BoundedExecutor`. Once launched, + // will block if the executor is at capacity, until one of the other spawned + // futures completes. + async fn run_on_semaphore(semaphore: Arc, f: F) -> F::Output + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let permit = Self::acquire_permit(semaphore.clone()).await; + Self::with_permit(f, permit).await + } + + /// Unconditionally spawns a task driving retries of a [`Future`] on the `BoundedExecutor`. + /// This [`Future`] will be executed in the form of attempts, one after the other, run on + /// our bounded executor, each according to the provided [`crate::RetryConfig`]. + /// + /// Each attempt is async and will block if the executor is at capacity until + /// one of the other attempts completes. In case the attempt completes with an error, + /// the driver completes a backoff (according to the retry configuration) without holding + /// a permit, before, queueing an attempt on the executor again. + /// + /// This function returns a [`JoinHandle`] that the caller can `.await` on for + /// the results of the overall retry driver. + /// + /// TODO: this still spawns one task, unconditionally, per call. + /// We would instead like to have one central task that drives all retries + /// for the whole executor. + pub(crate) fn spawn_with_retries( + &self, + retry_config: crate::RetryConfig, + mut f: F, + ) -> JoinHandle> + where + F: FnMut() -> Fut + Send + 'static, + Fut: Future>> + Send + 'static, + T: Send + 'static, + E: Send + 'static, + { + let retrier = { + let semaphore = self.semaphore.clone(); + + let executor = move || { + let semaphore = semaphore.clone(); + BoundedExecutor::run_on_semaphore(semaphore, f()) + }; + + retry_config.retry(executor) + }; + self.executor.spawn(retrier) + } + + // Equips a future with a final step that drops the held semaphore permit + async fn with_permit(f: F, spawn_permit: OwnedSemaphorePermit) -> F::Output + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + f.map(|ret| { drop(spawn_permit); ret - }); - self.executor.spawn(f) + }) + .await } } #[cfg(test)] mod test { + use crate::RetryConfig; + use super::*; - use futures::{channel::oneshot, executor::block_on, future::Future}; + use futures::{channel::oneshot, executor::block_on, future::Future, FutureExt}; use std::{ - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + mpsc, + }, + thread, time::Duration, }; use tokio::{runtime::Runtime, time::sleep}; + #[test] + fn try_spawn_panicking() { + let rt = Runtime::new().unwrap(); + let executor = rt.handle().clone(); + let executor = BoundedExecutor::new(1, executor); + + // executor has a free slot, spawn should succeed + let fpanic = executor.try_spawn(panicking()).unwrap(); + // this would return a JoinError::panic + block_on(fpanic).unwrap_err(); + + let (tx1, rx1) = oneshot::channel(); + // the executor should not be full, because the permit for the panicking task should drop at unwinding + let f1 = executor.try_spawn(rx1).unwrap(); + + // cleanup + tx1.send(()).unwrap(); + block_on(f1).unwrap().unwrap(); + } + + async fn panicking() { + panic!(); + } + #[test] fn try_spawn() { let rt = Runtime::new().unwrap(); @@ -114,8 +236,7 @@ mod test { // executor is full, try_spawn should return err and give back the task // we attempted to spawn - - let rx2 = executor.try_spawn(rx2).unwrap_err(); + let BoundedExecutionError::Full(rx2) = executor.try_spawn(rx2).unwrap_err(); // complete f1 future, should open a free slot in executor @@ -123,7 +244,6 @@ mod test { block_on(f1).unwrap().unwrap(); // should successfully spawn a new task now that the first is complete - let f2 = executor.try_spawn(rx2).unwrap(); // cleanup @@ -132,8 +252,60 @@ mod test { block_on(f2).unwrap().unwrap(); } - fn yield_task() -> impl Future { - sleep(Duration::from_millis(1)).map(|_| ()) + // ensure tasks spawned with retries do not hog the semaphore + #[test] + fn test_spawn_with_semaphore() { + // beware: the timeout is here to witness a failure rather than a hung test in case the + // executor does not work correctly. + panic_after(Duration::from_secs(10), || { + let rt = Runtime::new().unwrap(); + let executor = rt.handle().clone(); + let executor = BoundedExecutor::new(1, executor); + + let infinite_retry_config = RetryConfig { + // Retry for forever + retrying_max_elapsed_time: None, + ..Default::default() + }; + + // we can queue this future with infinite retries + let handle_infinite_fails = + executor.spawn_with_retries(infinite_retry_config, always_failing); + + // check we can still enqueue another successful task + let (tx1, rx1) = oneshot::channel(); + let f1 = block_on(executor.spawn(rx1)); + + // complete f1 future, should open a free slot in executor + tx1.send(()).unwrap(); + block_on(f1).unwrap().unwrap(); + + // cleanup + handle_infinite_fails.abort(); + }) + } + + async fn always_failing() -> Result<(), backoff::Error> { + Err(Into::into(anyhow::anyhow!("oops"))) + } + + fn panic_after(d: Duration, f: F) -> T + where + T: Send + 'static, + F: FnOnce() -> T, + F: Send + 'static, + { + let (done_tx, done_rx) = mpsc::channel(); + let handle = thread::spawn(move || { + let val = f(); + done_tx.send(()).expect("Unable to send completion signal"); + val + }); + + match done_rx.recv_timeout(d) { + Ok(_) => handle.join().expect("Thread panicked"), + Err(_) => panic!("Thread took too long"), + } } // spawn NUM_TASKS futures on a BoundedExecutor, ensuring that no more than @@ -177,4 +349,8 @@ mod test { } } } + + fn yield_task() -> impl Future { + sleep(Duration::from_millis(1)).map(|_| ()) + } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 6d64cb472..77a51af19 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -20,6 +20,9 @@ pub use crate::{ worker::WorkerNetwork, }; +// the result of our network messages +pub type MessageResult = Result, anyhow::Error>; + #[derive(Debug)] #[must_use] pub struct CancelHandler(tokio::task::JoinHandle); diff --git a/network/src/primary.rs b/network/src/primary.rs index 06b2e88d0..d9a78f2d6 100644 --- a/network/src/primary.rs +++ b/network/src/primary.rs @@ -1,9 +1,8 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{BoundedExecutor, CancelHandler, RetryConfig, MAX_TASK_CONCURRENCY}; +use crate::{BoundedExecutor, CancelHandler, MessageResult, RetryConfig, MAX_TASK_CONCURRENCY}; use crypto::traits::VerifyingKey; -use futures::FutureExt; use multiaddr::Multiaddr; use rand::{prelude::SliceRandom as _, rngs::SmallRng, SeedableRng as _}; use std::collections::HashMap; @@ -20,7 +19,12 @@ pub struct PrimaryNetwork { retry_config: RetryConfig, /// Small RNG just used to shuffle nodes and randomize connections (not crypto related). rng: SmallRng, - executor: BoundedExecutor, + // One bounded executor per address + executors: HashMap, +} + +fn default_executor() -> BoundedExecutor { + BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current()) } impl Default for PrimaryNetwork { @@ -36,7 +40,7 @@ impl Default for PrimaryNetwork { config: Default::default(), retry_config, rng: SmallRng::from_entropy(), - executor: BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current()), + executors: HashMap::new(), } } } @@ -62,32 +66,43 @@ impl PrimaryNetwork { &mut self, address: Multiaddr, message: &PrimaryMessage, - ) -> CancelHandler<()> { + ) -> CancelHandler { let message = BincodeEncodedPayload::try_from(message).expect("Failed to serialize payload"); self.send_message(address, message).await } + // Safety + // Since this spawns an unbounded task, this should be called in a time-restricted fashion. + // Here the callers are [`PrimaryNetwork::broadcast`] and [`PrimaryNetwork::send`], + // at respectively N and K calls per round. + // (where N is the number of primaries, K the number of workers for this primary) + // See the TODO on spawn_with_retries for lifting this restriction. async fn send_message( &mut self, address: Multiaddr, message: BincodeEncodedPayload, - ) -> CancelHandler<()> { - let client = self.client(address); + ) -> CancelHandler { + let client = self.client(address.clone()); + + let message_send = move || { + let mut client = client.clone(); + let message = message.clone(); + + async move { + client.send_message(message).await.map_err(|e| { + // this returns a backoff::Error::Transient + // so that if tonic::Status is returned, we retry + Into::>::into(anyhow::Error::from(e)) + }) + } + }; + let handle = self - .executor - .spawn( - self.retry_config - .retry(move || { - let mut client = client.clone(); - let message = message.clone(); - async move { client.send_message(message).await.map_err(Into::into) } - }) - .map(|response| { - response.expect("we retry forever so this shouldn't fail"); - }), - ) - .await; + .executors + .entry(address) + .or_insert_with(default_executor) + .spawn_with_retries(self.retry_config, message_send); CancelHandler(handle) } @@ -96,7 +111,7 @@ impl PrimaryNetwork { &mut self, addresses: Vec, message: &PrimaryMessage, - ) -> Vec> { + ) -> Vec> { let message = BincodeEncodedPayload::try_from(message).expect("Failed to serialize payload"); let mut handlers = Vec::new(); @@ -114,8 +129,10 @@ impl PrimaryNetwork { ) -> JoinHandle<()> { let message = BincodeEncodedPayload::try_from(message).expect("Failed to serialize payload"); - let mut client = self.client(address); - self.executor + let mut client = self.client(address.clone()); + self.executors + .entry(address) + .or_insert_with(default_executor) .spawn(async move { let _ = client.send_message(message).await; }) @@ -134,9 +151,11 @@ impl PrimaryNetwork { let mut handlers = Vec::new(); for address in addresses { let handle = { - let mut client = self.client(address); + let mut client = self.client(address.clone()); let message = message.clone(); - self.executor + self.executors + .entry(address) + .or_insert_with(default_executor) .spawn(async move { let _ = client.send_message(message).await; }) @@ -162,9 +181,11 @@ impl PrimaryNetwork { let mut handlers = Vec::new(); for address in addresses { let handle = { - let mut client = self.client(address); + let mut client = self.client(address.clone()); let message = message.clone(); - self.executor + self.executors + .entry(address) + .or_insert_with(default_executor) .spawn(async move { let _ = client.send_message(message).await; }) diff --git a/network/src/worker.rs b/network/src/worker.rs index c33de9528..f0592eb65 100644 --- a/network/src/worker.rs +++ b/network/src/worker.rs @@ -1,9 +1,8 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{BoundedExecutor, CancelHandler, RetryConfig, MAX_TASK_CONCURRENCY}; +use crate::{BoundedExecutor, CancelHandler, MessageResult, RetryConfig, MAX_TASK_CONCURRENCY}; use crypto::traits::VerifyingKey; -use futures::FutureExt; use multiaddr::Multiaddr; use rand::{prelude::SliceRandom as _, rngs::SmallRng, SeedableRng as _}; use std::collections::HashMap; @@ -17,7 +16,11 @@ pub struct WorkerNetwork { retry_config: RetryConfig, /// Small RNG just used to shuffle nodes and randomize connections (not crypto related). rng: SmallRng, - executor: BoundedExecutor, + executors: HashMap, +} + +fn default_executor() -> BoundedExecutor { + BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current()) } impl Default for WorkerNetwork { @@ -33,7 +36,7 @@ impl Default for WorkerNetwork { config: Default::default(), retry_config, rng: SmallRng::from_entropy(), - executor: BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current()), + executors: HashMap::new(), } } } @@ -59,32 +62,43 @@ impl WorkerNetwork { &mut self, address: Multiaddr, message: &WorkerMessage, - ) -> CancelHandler<()> { + ) -> CancelHandler { let message = BincodeEncodedPayload::try_from(message).expect("Failed to serialize payload"); self.send_message(address, message).await } + // Safety + // Since this spawns an unbounded task, this should be called in a time-restricted fashion. + // Here the callers are [`WorkerNetwork::broadcast`] and [`WorkerNetwork::send`], + // at respectively N and K calls per round. + // (where N is the number of validators, the K is for the number of batches to be reported to the primary) + // See the TODO on spawn_with_retries for lifting this restriction. async fn send_message( &mut self, address: Multiaddr, message: BincodeEncodedPayload, - ) -> CancelHandler<()> { - let client = self.client(address); + ) -> CancelHandler { + let client = self.client(address.clone()); + + let message_send = move || { + let mut client = client.clone(); + let message = message.clone(); + + async move { + client.send_message(message).await.map_err(|e| { + // this returns a backoff::Error::Transient + // so that if tonic::Status is returned, we retry + Into::>::into(anyhow::Error::from(e)) + }) + } + }; + let handle = self - .executor - .spawn( - self.retry_config - .retry(move || { - let mut client = client.clone(); - let message = message.clone(); - async move { client.send_message(message).await.map_err(Into::into) } - }) - .map(|response| { - response.expect("we retry forever so this shouldn't fail"); - }), - ) - .await; + .executors + .entry(address) + .or_insert_with(default_executor) + .spawn_with_retries(self.retry_config, message_send); CancelHandler(handle) } @@ -93,7 +107,7 @@ impl WorkerNetwork { &mut self, addresses: Vec, message: &WorkerMessage, - ) -> Vec> { + ) -> Vec> { let message = BincodeEncodedPayload::try_from(message).expect("Failed to serialize payload"); let mut handlers = Vec::new(); @@ -120,8 +134,10 @@ impl WorkerNetwork { message: T, ) -> JoinHandle<()> { let message = message.into(); - let mut client = self.client(address); - self.executor + let mut client = self.client(address.clone()); + self.executors + .entry(address) + .or_insert_with(default_executor) .spawn(async move { let _ = client.send_message(message).await; }) diff --git a/primary/src/core.rs b/primary/src/core.rs index c1f83f937..905c0541b 100644 --- a/primary/src/core.rs +++ b/primary/src/core.rs @@ -10,7 +10,7 @@ use crate::{ use async_recursion::async_recursion; use config::{Committee, Epoch}; use crypto::{traits::VerifyingKey, Hash as _, SignatureService}; -use network::{CancelHandler, PrimaryNetwork}; +use network::{CancelHandler, MessageResult, PrimaryNetwork}; use std::{ collections::{HashMap, HashSet}, sync::{ @@ -86,7 +86,7 @@ pub struct Core { /// A network sender to send the batches to the other workers. network: PrimaryNetwork, /// Keeps the cancel handlers of the messages we sent. - cancel_handlers: HashMap>>, + cancel_handlers: HashMap>>, /// Metrics handler metrics: Arc, } diff --git a/worker/src/quorum_waiter.rs b/worker/src/quorum_waiter.rs index 952f1144d..60e4cf639 100644 --- a/worker/src/quorum_waiter.rs +++ b/worker/src/quorum_waiter.rs @@ -4,7 +4,7 @@ use config::{Committee, Stake, WorkerId}; use crypto::traits::VerifyingKey; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _}; -use network::{CancelHandler, WorkerNetwork}; +use network::{CancelHandler, MessageResult, WorkerNetwork}; use tokio::{ sync::{ mpsc::{Receiver, Sender}, @@ -62,8 +62,8 @@ impl QuorumWaiter { } /// Helper function. It waits for a future to complete and then delivers a value. - async fn waiter(wait_for: CancelHandler<()>, deliver: Stake) -> Stake { - wait_for.await; + async fn waiter(wait_for: CancelHandler, deliver: Stake) -> Stake { + let _ = wait_for.await; deliver }