Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace once_cell with async-lock #29

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ exclude = ["/.*"]

[dependencies]
async-channel = "1.4.0"
async-lock = "2.6"
async-task = "4.0.2"
atomic-waker = "1.0.0"
fastrand = "1.3.4"
futures-lite = "1.11.0"
once_cell = "1.4.1"
log = "0.4.11"
54 changes: 31 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ use std::thread;
use std::time::Duration;

use async_channel::{bounded, Receiver};
use async_lock::OnceCell;
use async_task::Runnable;
use atomic_waker::AtomicWaker;
use futures_lite::{future, prelude::*, ready};
use once_cell::sync::Lazy;

#[doc(no_inline)]
pub use async_task::Task;
Expand All @@ -113,27 +113,13 @@ const MAX_MAX_THREADS: usize = 10000;
/// Env variable that allows to override default value for max threads.
const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";

/// Lazily initialized global executor.
static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
}),
cvar: Condvar::new(),
thread_limit: Executor::max_threads(),
});

/// The blocking executor.
struct Executor {
/// Inner state of the executor.
inner: Mutex<Inner>,

/// Used to put idle threads to sleep and wake them up when new work comes in.
cvar: Condvar,

/// Maximum number of threads in the pool
thread_limit: usize,
}

/// Inner state of the blocking executor.
Expand All @@ -148,6 +134,9 @@ struct Inner {
/// This is the number of idle threads + the number of active threads.
thread_count: usize,

/// Maximum number of threads in the pool
thread_limit: usize,

/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
}
Expand All @@ -162,11 +151,27 @@ impl Executor {
Err(_) => DEFAULT_MAX_THREADS,
}
}

/// Get the global blocking task executor.
fn get() -> &'static Executor {
static EXECUTOR: OnceCell<Executor> = OnceCell::new();

EXECUTOR.get_or_init_blocking(|| Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
thread_limit: Executor::max_threads(),
}),
cvar: Condvar::new(),
})
}

/// Spawns a future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
let (runnable, task) = async_task::spawn(future, |r| EXECUTOR.schedule(r));
let (runnable, task) = async_task::spawn(future, |r| Executor::get().schedule(r));
runnable.schedule();
task
}
Expand Down Expand Up @@ -223,24 +228,27 @@ impl Executor {
fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < EXECUTOR.thread_limit
{
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < inner.thread_limit {
// The new thread starts in idle state.
inner.idle_count += 1;
inner.thread_count += 1;

// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();

// Generate a new thread ID.
static ID: AtomicUsize = AtomicUsize::new(1);
let id = ID.fetch_add(1, Ordering::Relaxed);

// Spawn the new thread.
thread::Builder::new()
if let Err(e) = thread::Builder::new()
.name(format!("blocking-{}", id))
.spawn(move || self.main_loop())
.unwrap();
{
log::error!("Failed to spawn new blocking thread: {:?}", e);
inner.thread_limit = inner.thread_count;
break;
}

// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();
}
}
}
Expand Down