Skip to content

Commit

Permalink
introduce random make shutdown faster
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang committed Sep 20, 2023
1 parent 17b0be9 commit 5020db8
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 17 deletions.
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();
handle.shared.owned.close_and_shutdown_all(0);

// Drain local queue
// We already shut down every task, so we just need to drop the task.
Expand Down Expand Up @@ -615,7 +615,7 @@ impl Schedule for Arc<Handle> {
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all();
self.shared.owned.close_and_shutdown_all(0);
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
Expand Down
10 changes: 9 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,16 @@ impl Core {
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Start from a random inner list
let start = self
.rand
.fastrand_n(worker.handle.shared.owned.grain as u32);
// Signal to all tasks to shut down.
worker.handle.shared.owned.close_and_shutdown_all();
worker
.handle
.shared
.owned
.close_and_shutdown_all(start as usize);

self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ impl Worker {
}
}

// Start from a random inner list
let start = core.rand.fastrand_n(cx.shared().owned.grain as u32);
cx.shared().shutdown_core(&cx.handle, core);

// It is possible that tasks wake others during drop, so we need to
Expand Down Expand Up @@ -1459,8 +1461,8 @@ impl Shared {
self.condvars.len() > self.remotes.len()
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();
pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>, start: u32) {
self.owned.close_and_shutdown_all(start as usize);

core.stats.submit(&self.worker_metrics[core.index]);

Expand Down
21 changes: 10 additions & 11 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ fn get_next_id() -> NonZeroU32 {
}

pub(crate) struct OwnedTasks<S: 'static> {
lists: Vec<Mutex<ListInner<S>>>,
lists: Box<[Mutex<ListInner<S>>]>,
pub(crate) id: NonZeroU32,
closed: AtomicBool,
grain: usize,
pub(crate) grain: u32,
count: AtomicUsize,
}

Expand All @@ -59,18 +59,18 @@ struct OwnedTasksInner<S: 'static> {

impl<S: 'static> OwnedTasks<S> {
/// grain must be an integer power of 2
pub(crate) fn new(grain: usize) -> Self {
pub(crate) fn new(grain: u32) -> Self {
assert_eq!(
grain & (grain - 1),
0,
"the grain of OwnedTasks must be an integer power of 2"
);
let mut lists = Vec::with_capacity(grain);
let mut lists = Vec::with_capacity(grain as usize);
for _ in 0..grain {
lists.push(Mutex::new(LinkedList::new()))
}
Self {
lists,
lists: lists.into_boxed_slice(),
closed: AtomicBool::new(false),
id: get_next_id(),
grain,
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<S: 'static> OwnedTasks<S> {
where
S: Schedule,
{
let mut lock = self.lists[task_id.0 as usize & (self.grain - 1)].lock();
let mut lock = self.lists[task_id.0 as usize & (self.grain - 1) as usize].lock();
// check close flag
if self.closed.load(Ordering::Acquire) {
task.shutdown();
Expand All @@ -150,17 +150,16 @@ impl<S: 'static> OwnedTasks<S> {

/// Shuts down all tasks in the collection. This call also closes the
/// collection, preventing new items from being added.
pub(crate) fn close_and_shutdown_all(&self)
pub(crate) fn close_and_shutdown_all(&self, start: usize)
where
S: Schedule,
{
// The first iteration of the loop was unrolled so it can set the
// closed bool.
self.closed.store(true, Ordering::Release);

for list in &self.lists {
for i in start..self.lists.len() + start {
loop {
let task = match list.lock().pop_back() {
let task = match self.lists[i % (self.lists.len())].lock().pop_back() {
Some(task) => {
self.count.fetch_sub(1, Ordering::Relaxed);
task
Expand Down Expand Up @@ -190,7 +189,7 @@ impl<S: 'static> OwnedTasks<S> {

#[inline]
unsafe fn remove_inner(&self, task: &Task<S>) -> Option<Task<S>> {
match self.lists[(task.header().task_id.0) as usize & (self.grain - 1)]
match self.lists[(task.header().task_id.0) as usize & (self.grain - 1) as usize]
.lock()
.remove(task.header_ptr())
{
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl Runtime {
fn shutdown(&self) {
let mut core = self.0.core.try_lock().unwrap();

self.0.owned.close_and_shutdown_all();
self.0.owned.close_and_shutdown_all(0);

while let Some(task) = core.queue.pop_back() {
drop(task);
Expand Down

0 comments on commit 5020db8

Please sign in to comment.