Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: move driver unpark out of multi-thread parker #5026

Merged
merged 1 commit into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ impl MultiThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let driver_unpark = driver.unpark();
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
driver_unpark,
blocking_spawner,
seed_generator,
config,
Expand Down
20 changes: 5 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::driver::{Driver, Unpark};
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;

use std::sync::atomic::Ordering::SeqCst;
Expand Down Expand Up @@ -42,23 +42,17 @@ const NOTIFIED: usize = 3;
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,

/// Unpark handle
handle: Unpark,
}

impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();

Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
handle,
}),
}),
}
Expand Down Expand Up @@ -102,8 +96,8 @@ impl Clone for Parker {
}

impl Unparker {
pub(crate) fn unpark(&self) {
self.inner.unpark();
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
self.inner.unpark(driver);
}
}

Expand Down Expand Up @@ -201,7 +195,7 @@ impl Inner {
}
}

fn unpark(&self) {
fn unpark(&self, driver: &driver::Unpark) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
Expand All @@ -211,7 +205,7 @@ impl Inner {
EMPTY => {} // no one was waiting
NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => self.unpark_driver(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
Expand All @@ -233,10 +227,6 @@ impl Inner {
self.condvar.notify_one()
}

fn unpark_driver(&self) {
self.shared.handle.unpark();
}

fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
Expand Down
9 changes: 7 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub(super) struct Shared {
/// how they communicate between each other.
remotes: Box<[Remote]>,

/// Used to unpark threads blocked on the I/O driver
driver: driver::Unpark,

/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
Expand Down Expand Up @@ -190,6 +193,7 @@ pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
driver_unpark: driver::Unpark,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
Expand Down Expand Up @@ -223,6 +227,7 @@ pub(super) fn create(
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
driver: driver_unpark,
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
Expand Down Expand Up @@ -774,13 +779,13 @@ impl Shared {

fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
self.remotes[index].unpark.unpark(&self.driver);
}
}

fn notify_all(&self) {
for remote in &self.remotes[..] {
remote.unpark.unpark();
remote.unpark.unpark(&self.driver);
}
}

Expand Down