Skip to content

Commit

Permalink
rt: update MultiThread to use its own Handle (#5025)
Browse files Browse the repository at this point in the history
This is the equivalent as #5023, but for the MultiThread
scheduler. This patch updates `MultiThread` to use `Arc<Handle>` for all
of its cross-thread needs instead of `Arc<Shared>`.

The effect of this change is that the multi-thread scheduler only has a
single `Arc` type, which includes the driver handles, keeping all
"shared state" together.
  • Loading branch information
carllerche authored Sep 17, 2022
1 parent 2effa7f commit cdd6eea
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 143 deletions.
24 changes: 11 additions & 13 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,10 +993,9 @@ cfg_test_util! {
cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sync::Arc;
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, Scheduler};
use crate::runtime::scheduler::{self, multi_thread, MultiThread};
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -1007,9 +1006,16 @@ cfg_rt_multi_thread! {
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -1018,20 +1024,12 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: self.seed_generator.next_generator(),
seed_generator: seed_generator_1,
},
);

let inner = Arc::new(multi_thread::Handle {
spawner: scheduler.spawner().clone(),
driver: driver_handle,
blocking_spawner,
seed_generator: self.seed_generator.next_generator(),
});
let inner = scheduler::Handle::MultiThread(inner);

// Create the runtime handle
let handle = Handle { inner };
let handle = scheduler::Handle::MultiThread(scheduler.handle().clone());
let handle = Handle { inner: handle };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ cfg_rt! {
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),

#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => h.spawner.spawn(future, id),
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
}
}

Expand All @@ -92,7 +92,7 @@ cfg_rt! {
Handle::CurrentThread(_) => {},

#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(ref h) => h.spawner.shutdown(),
Handle::MultiThread(ref h) => h.shutdown(),
}
}

Expand Down
59 changes: 50 additions & 9 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::runtime::scheduler::multi_thread::Spawner;
use crate::runtime::{blocking, driver};
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::worker;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
};
use crate::util::RngSeedGenerator;

use std::fmt;

/// Handle to the multi thread scheduler
#[derive(Debug)]
pub(crate) struct Handle {
/// Task spawner
pub(crate) spawner: Spawner,
pub(super) shared: worker::Shared,

/// Resource driver handles
pub(crate) driver: driver::Handle,
Expand All @@ -18,28 +24,63 @@ pub(crate) struct Handle {
pub(crate) seed_generator: RngSeedGenerator,
}

impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
}

pub(crate) fn shutdown(&self) {
self.shared.close();
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.shared.schedule(notified, false);
}

handle
}
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.spawner.shared.worker_metrics.len()
self.shared.worker_metrics.len()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.spawner.shared.scheduler_metrics
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.spawner.shared.worker_metrics[worker]
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.spawner.shared.injection_queue_depth()
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.spawner.shared.worker_local_queue_depth(worker)
self.shared.worker_local_queue_depth(worker)
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
}
}
73 changes: 23 additions & 50 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,38 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Config, Driver};
use crate::runtime::{blocking, driver, Config, Driver};
use crate::util::RngSeedGenerator;

use std::fmt;
use std::future::Future;

/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread {
spawner: Spawner,
}

/// Submits futures to the associated thread pool for execution.
///
/// A `Spawner` instance is a handle to a single thread pool that allows the owner
/// of the handle to spawn futures onto the thread pool.
///
/// The `Spawner` handle is *only* used for spawning new futures. It does not
/// impact the lifecycle of the thread pool in any way. The thread pool may
/// shut down while there are outstanding `Spawner` instances.
///
/// `Spawner` instances are obtained by calling [`MultiThread::spawner`].
///
/// [`MultiThread::spawner`]: method@MultiThread::spawner
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<worker::Shared>,
handle: Arc<Handle>,
}

// ===== impl MultiThread =====

impl MultiThread {
pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) {
pub(crate) fn new(
size: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, config);
let spawner = Spawner { shared };
let multi_thread = MultiThread { spawner };
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
blocking_spawner,
seed_generator,
config,
);
let multi_thread = MultiThread { handle };

(multi_thread, launch)
}
Expand All @@ -61,8 +57,8 @@ impl MultiThread {
///
/// The `Spawner` handle can be cloned and enables spawning tasks from other
/// threads.
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
pub(crate) fn handle(&self) -> &Arc<Handle> {
&self.handle
}

/// Blocks the current thread waiting for the future to complete.
Expand All @@ -86,29 +82,6 @@ impl fmt::Debug for MultiThread {

impl Drop for MultiThread {
fn drop(&mut self) {
self.spawner.shutdown();
}
}

// ==== impl Spawner =====

impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
worker::Shared::bind_new_task(&self.shared, future, id)
}

pub(crate) fn shutdown(&self) {
self.shared.close();
}
}

impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
self.handle.shutdown();
}
}
Loading

0 comments on commit cdd6eea

Please sign in to comment.