Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

[network] Don't back off forever on the Semaphore #559

Merged
merged 7 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ crypto = { path = "../crypto" }
tonic = { version = "0.7.2", features = ["tls"] }
backoff = { version = "0.4.0", features = ["tokio"] }
multiaddr = "0.14.0"
anyhow = "1.0.58"

mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "123c9e40b529315e1c1d91a54fb717111c3e349c" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
Expand Down
220 changes: 198 additions & 22 deletions network/src/bounded_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! concurrently when spawned through this executor, defined by the initial
//! `capacity`.

use futures::future::{Future, FutureExt};
use futures::{future::Future, FutureExt};
use std::sync::Arc;
use tokio::{
runtime::Handle,
Expand All @@ -18,6 +18,31 @@ use tokio::{

use tracing::info;

use thiserror::Error;

#[derive(Error)]
pub enum BoundedExecutionError<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[error("Concurrent execution limit reached")]
Full(F),
}

impl<F> std::fmt::Debug for BoundedExecutionError<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
// Elide the future to let this be unwrapped
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full(_f) => f.debug_tuple("Full").finish(),
}
}
}

#[derive(Clone, Debug)]
pub struct BoundedExecutor {
semaphore: Arc<Semaphore>,
Expand All @@ -35,6 +60,18 @@ impl BoundedExecutor {
}
}

// Acquires a permit with the semaphore, first gracefully,
// then queuing after logging that we're out of capacity.
async fn acquire_permit(semaphore: Arc<Semaphore>) -> OwnedSemaphorePermit {
match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
info!("concurrent task limit reached, waiting...");
semaphore.acquire_owned().await.unwrap()
}
}
}

/// Spawn a [`Future`] on the `BoundedExecutor`. This function is async and
/// will block if the executor is at capacity until one of the other spawned
/// futures completes. This function returns a [`JoinHandle`] that the caller
Expand All @@ -44,13 +81,7 @@ impl BoundedExecutor {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let permit = match self.semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
info!("concurrent task limit reached, waiting...");
self.semaphore.clone().acquire_owned().await.unwrap()
}
};
let permit = Self::acquire_permit(self.semaphore.clone()).await;

self.spawn_with_permit(f, permit)
}
Expand All @@ -60,14 +91,14 @@ impl BoundedExecutor {
/// caller attempted to spawn. Otherwise, this will spawn the future on the
/// executor and send back a [`JoinHandle`] that the caller can `.await` on
/// for the results of the [`Future`].
pub fn try_spawn<F>(&self, f: F) -> Result<JoinHandle<F::Output>, F>
pub fn try_spawn<F>(&self, f: F) -> Result<JoinHandle<F::Output>, BoundedExecutionError<F>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self.semaphore.clone().try_acquire_owned().ok() {
Some(permit) => Ok(self.spawn_with_permit(f, permit)),
None => Err(f),
match self.semaphore.clone().try_acquire_owned() {
Ok(permit) => Ok(self.spawn_with_permit(f, permit)),
Err(_) => Err(BoundedExecutionError::Full(f)),
}
}

Expand All @@ -81,24 +112,115 @@ impl BoundedExecutor {
F::Output: Send + 'static,
{
// Release the permit back to the semaphore when this task completes.
let f = f.map(move |ret| {
let f = Self::with_permit(f, spawn_permit);
self.executor.spawn(f)
}

// Returns a [`Future`] that complies with the `BoundedExecutor`. Once launched,
// will block if the executor is at capacity, until one of the other spawned
// futures completes.
async fn run_on_semaphore<F>(semaphore: Arc<Semaphore>, f: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let permit = Self::acquire_permit(semaphore.clone()).await;
Self::with_permit(f, permit).await
}

/// Unconditionally spawns a task driving retries of a [`Future`] on the `BoundedExecutor`.
/// This [`Future`] will be executed in the form of attempts, one after the other, run on
/// our bounded executor, each according to the provided [`crate::RetryConfig`].
///
/// Each attempt is async and will block if the executor is at capacity until
/// one of the other attempts completes. In case the attempt completes with an error,
/// the driver completes a backoff (according to the retry configuration) without holding
/// a permit, before, queueing an attempt on the executor again.
///
/// This function returns a [`JoinHandle`] that the caller can `.await` on for
/// the results of the overall retry driver.
///
/// TODO: this still spawns one task, unconditionally, per call.
/// We would instead like to have one central task that drives all retries
/// for the whole executor.
pub(crate) fn spawn_with_retries<F, Fut, T, E>(
&self,
retry_config: crate::RetryConfig,
mut f: F,
) -> JoinHandle<Result<T, E>>
where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, backoff::Error<E>>> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
let retrier = {
let semaphore = self.semaphore.clone();

let executor = move || {
let semaphore = semaphore.clone();
BoundedExecutor::run_on_semaphore(semaphore, f())
};

retry_config.retry(executor)
};
self.executor.spawn(retrier)
}

// Equips a future with a final step that drops the held semaphore permit
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
async fn with_permit<F>(f: F, spawn_permit: OwnedSemaphorePermit) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
f.map(|ret| {
drop(spawn_permit);
ret
});
self.executor.spawn(f)
})
.await
}
}

#[cfg(test)]
mod test {
use crate::RetryConfig;

use super::*;
use futures::{channel::oneshot, executor::block_on, future::Future};
use futures::{channel::oneshot, executor::block_on, future::Future, FutureExt};
use std::{
sync::atomic::{AtomicU32, Ordering},
sync::{
atomic::{AtomicU32, Ordering},
mpsc,
},
thread,
time::Duration,
};
use tokio::{runtime::Runtime, time::sleep};

#[test]
fn try_spawn_panicking() {
let rt = Runtime::new().unwrap();
let executor = rt.handle().clone();
let executor = BoundedExecutor::new(1, executor);

// executor has a free slot, spawn should succeed
let fpanic = executor.try_spawn(panicking()).unwrap();
// this would return a JoinError::panic
block_on(fpanic).unwrap_err();

let (tx1, rx1) = oneshot::channel();
// the executor should not be full, because the permit for the panicking task should drop at unwinding
let f1 = executor.try_spawn(rx1).unwrap();

// cleanup
tx1.send(()).unwrap();
block_on(f1).unwrap().unwrap();
}

async fn panicking() {
panic!();
}

#[test]
fn try_spawn() {
let rt = Runtime::new().unwrap();
Expand All @@ -114,16 +236,14 @@ mod test {

// executor is full, try_spawn should return err and give back the task
// we attempted to spawn

let rx2 = executor.try_spawn(rx2).unwrap_err();
let BoundedExecutionError::Full(rx2) = executor.try_spawn(rx2).unwrap_err();

// complete f1 future, should open a free slot in executor

tx1.send(()).unwrap();
block_on(f1).unwrap().unwrap();

// should successfully spawn a new task now that the first is complete

let f2 = executor.try_spawn(rx2).unwrap();

// cleanup
Expand All @@ -132,8 +252,60 @@ mod test {
block_on(f2).unwrap().unwrap();
}

fn yield_task() -> impl Future<Output = ()> {
sleep(Duration::from_millis(1)).map(|_| ())
// ensure tasks spawned with retries do not hog the semaphore
#[test]
fn test_spawn_with_semaphore() {
// beware: the timeout is here to witness a failure rather than a hung test in case the
// executor does not work correctly.
panic_after(Duration::from_secs(10), || {
let rt = Runtime::new().unwrap();
let executor = rt.handle().clone();
let executor = BoundedExecutor::new(1, executor);

let infinite_retry_config = RetryConfig {
// Retry for forever
retrying_max_elapsed_time: None,
..Default::default()
};

// we can queue this future with infinite retries
let handle_infinite_fails =
executor.spawn_with_retries(infinite_retry_config, always_failing);

// check we can still enqueue another successful task
let (tx1, rx1) = oneshot::channel();
let f1 = block_on(executor.spawn(rx1));

// complete f1 future, should open a free slot in executor
tx1.send(()).unwrap();
block_on(f1).unwrap().unwrap();

// cleanup
handle_infinite_fails.abort();
})
}

async fn always_failing() -> Result<(), backoff::Error<anyhow::Error>> {
Err(Into::into(anyhow::anyhow!("oops")))
}

fn panic_after<T, F>(d: Duration, f: F) -> T
where
T: Send + 'static,
F: FnOnce() -> T,
F: Send + 'static,
{
let (done_tx, done_rx) = mpsc::channel();
let handle = thread::spawn(move || {
let val = f();
done_tx.send(()).expect("Unable to send completion signal");
val
});

match done_rx.recv_timeout(d) {
Ok(_) => handle.join().expect("Thread panicked"),
Err(_) => panic!("Thread took too long"),
}
}

// spawn NUM_TASKS futures on a BoundedExecutor, ensuring that no more than
Expand Down Expand Up @@ -177,4 +349,8 @@ mod test {
}
}
}

fn yield_task() -> impl Future<Output = ()> {
sleep(Duration::from_millis(1)).map(|_| ())
}
}
3 changes: 3 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub use crate::{
worker::WorkerNetwork,
};

// the result of our network messages
pub type MessageResult = Result<tonic::Response<types::Empty>, anyhow::Error>;

#[derive(Debug)]
#[must_use]
pub struct CancelHandler<T>(tokio::task::JoinHandle<T>);
Expand Down
Loading