Skip to content

Commit

Permalink
rt: add runtime::context to unify thread-locals (#5143)
Browse files Browse the repository at this point in the history
This patch is the first step towards unifying all the thread-local
variables spread out across Tokio. A new `Context` struct is added which
will be used to replace the various thread-locals that exist today.

Initially, `Context` only holds the current runtime handle and the
random number generator. Further PRs will add other thread-local state.

A previous PR removed `runtime::context`. At that time,
`runtime::context` was used as an extra layer to access the various
runtime driver handles. This version of `runtime::context` serves a
different purpose (unifying all the thread-locals).
  • Loading branch information
carllerche authored Oct 31, 2022
1 parent d1a8ec6 commit df99428
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 100 deletions.
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ cfg_rt! {
pub mod runtime;
}
cfg_not_rt! {
// The `runtime` module is used when the IO or time driver is needed.
#[cfg(any(
feature = "macros",
feature = "net",
feature = "time",
all(unix, feature = "process"),
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/macros/support.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
cfg_macros! {
pub use crate::future::poll_fn;
pub use crate::future::maybe_done::maybe_done;
pub use crate::util::thread_rng_n;

#[doc(hidden)]
pub fn thread_rng_n(n: u32) -> u32 {
crate::runtime::context::thread_rng_n(n)
}
}

pub use std::future::Future;
Expand Down
7 changes: 0 additions & 7 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::util::{replace_thread_rng, RngSeedGenerator};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -48,9 +47,6 @@ struct Inner {

// Customizable wait timeout.
keep_alive: Duration,

// Random number seed
seed_generator: RngSeedGenerator,
}

struct Shared {
Expand Down Expand Up @@ -185,7 +181,6 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
seed_generator: builder.seed_generator.next_generator(),
}),
},
shutdown_rx,
Expand Down Expand Up @@ -435,8 +430,6 @@ impl Inner {
if let Some(f) = &self.after_start {
f()
}
// We own this thread so there is no need to replace the RngSeed once we're done.
let _ = replace_thread_rng(self.seed_generator.next_seed());

let mut shared = self.shared.lock();
let mut join_on_thread = None;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, Runtime};
use crate::util::{RngSeed, RngSeedGenerator};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use std::fmt;
use std::io;
Expand Down
72 changes: 72 additions & 0 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::util::rand::{FastRand, RngSeed};

cfg_rt! {
use crate::runtime::scheduler;
use std::cell::RefCell;
}

struct Context {
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
scheduler: RefCell<Option<scheduler::Handle>>,
rng: FastRand,
}

tokio_thread_local! {
static CONTEXT: Context = {
Context {
#[cfg(feature = "rt")]
scheduler: RefCell::new(None),
rng: FastRand::new(RngSeed::new()),
}
}
}

#[cfg(feature = "macros")]
pub(crate) fn thread_rng_n(n: u32) -> u32 {
CONTEXT.with(|ctx| ctx.rng.fastrand_n(n))
}

cfg_rt! {
use crate::runtime::TryCurrentError;

#[derive(Debug)]
pub(crate) struct EnterGuard {
old_handle: Option<scheduler::Handle>,
old_seed: RngSeed,
}

pub(crate) fn try_current() -> Result<scheduler::Handle, TryCurrentError> {
match CONTEXT.try_with(|ctx| ctx.scheduler.borrow().clone()) {
Ok(Some(handle)) => Ok(handle),
Ok(None) => Err(TryCurrentError::new_no_context()),
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
}
}

/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: crate::runtime::scheduler::Handle
pub(crate) fn try_enter(handle: &scheduler::Handle) -> Option<EnterGuard> {
let rng_seed = handle.seed_generator().next_seed();

CONTEXT.try_with(|ctx| {
let old_handle = ctx.scheduler.borrow_mut().replace(handle.clone());
let old_seed = ctx.rng.replace_seed(rng_seed);

EnterGuard {
old_handle,
old_seed,
}
}).ok()
}

impl Drop for EnterGuard {
fn drop(&mut self) {
CONTEXT.with(|ctx| {
*ctx.scheduler.borrow_mut() = self.old_handle.take();
ctx.rng.replace_seed(self.old_seed.clone());
});
}
}
}
6 changes: 3 additions & 3 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::{scheduler, RuntimeFlavor};
use crate::runtime::{context, scheduler, RuntimeFlavor};

/// Handle to the runtime.
///
Expand Down Expand Up @@ -29,7 +29,7 @@ use std::{error, fmt};
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
_guard: scheduler::EnterGuard,
_guard: context::EnterGuard,
_handle_lifetime: PhantomData<&'a Handle>,
}

Expand Down Expand Up @@ -106,7 +106,7 @@ impl Handle {
///
/// Contrary to `current`, this never panics
pub fn try_current() -> Result<Self, TryCurrentError> {
scheduler::Handle::try_current().map(|inner| Handle { inner })
context::try_current().map(|inner| Handle { inner })
}

/// Spawns a future onto the Tokio runtime.
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@
#[macro_use]
mod tests;

#[cfg(any(feature = "rt", feature = "macros"))]
pub(crate) mod context;

mod driver;

pub(crate) mod scheduler;

cfg_io_driver_impl! {
Expand Down Expand Up @@ -223,7 +227,7 @@ cfg_rt! {
pub use self::builder::Builder;
cfg_unstable! {
pub use self::builder::UnhandledPanic;
pub use crate::util::RngSeed;
pub use crate::util::rand::RngSeed;
}

use self::enter::enter;
Expand Down Expand Up @@ -632,7 +636,7 @@ cfg_rt! {
Scheduler::CurrentThread(current_thread) => {
// This ensures that tasks spawned on the current-thread
// runtime are dropped inside the runtime's context.
match self.handle.inner.try_enter() {
match context::try_enter(&self.handle.inner) {
Some(guard) => current_thread.set_context_guard(guard),
None => {
// The context thread-local has already been destroyed.
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{self, Driver};
use crate::runtime::scheduler::EnterGuard;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
Expand Down
60 changes: 11 additions & 49 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,64 +44,30 @@ impl Handle {
cfg_rt! {
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::{blocking, task::Id, TryCurrentError};
use crate::runtime::{blocking, task::Id};
use crate::runtime::context::{self, EnterGuard};
use crate::task::JoinHandle;
use crate::util::{replace_thread_rng, RngSeed, RngSeedGenerator};

use std::cell::RefCell;

#[derive(Debug)]
pub(crate) struct EnterGuard {
old_handle: Option<Handle>,
old_seed: RngSeed,
}

tokio_thread_local! {
static CURRENT: RefCell<Option<Handle>> = const { RefCell::new(None) }
}
use crate::util::RngSeedGenerator;

impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
match Handle::try_current() {
match context::try_current() {
Ok(handle) => handle,
Err(e) => panic!("{}", e),
}
}

pub(crate) fn try_current() -> Result<Handle, TryCurrentError> {
match CURRENT.try_with(|ctx| ctx.borrow().clone()) {
Ok(Some(handle)) => Ok(handle),
Ok(None) => Err(TryCurrentError::new_no_context()),
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
}
}

/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
pub(crate) fn enter(&self) -> EnterGuard {
match self.try_enter() {
match context::try_enter(self) {
Some(guard) => guard,
None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}

/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
pub(crate) fn try_enter(&self) -> Option<EnterGuard> {
let rng_seed = self.seed_generator().next_seed();
let old_handle = CURRENT.try_with(|ctx| ctx.borrow_mut().replace(self.clone())).ok()?;

let old_seed = replace_thread_rng(rng_seed);

Some(EnterGuard {
old_handle,
old_seed,
})
}

pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
match self {
Handle::CurrentThread(h) => &h.blocking_spawner,
Expand Down Expand Up @@ -143,16 +109,6 @@ cfg_rt! {
}
}

impl Drop for EnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
*ctx.borrow_mut() = self.old_handle.take();
});
// We discard the RngSeed associated with this guard
let _ = replace_thread_rng(self.old_seed.clone());
}
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

Expand Down Expand Up @@ -201,6 +157,12 @@ cfg_rt! {
}

cfg_not_rt! {
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
feature = "time",
))]
impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::runtime::{
blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
};
use crate::util::atomic_cell::AtomicCell;
use crate::util::{FastRand, RngSeedGenerator};
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::time::Duration;
Expand Down
14 changes: 2 additions & 12 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ pub(crate) use wake_list::WakeList;
pub(crate) mod linked_list;

#[cfg(any(feature = "rt", feature = "macros"))]
mod rand;
pub(crate) mod rand;

cfg_rt! {
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;

pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng};
pub(crate) use self::rand::RngSeedGenerator;

mod wake;
pub(crate) use wake::WakerRef;
Expand All @@ -66,17 +66,7 @@ cfg_rt! {
pub(crate) use rc_cell::RcCell;
}

#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[cfg(feature = "rt")]
pub use self::rand::RngSeed;

#[cfg(any(feature = "macros"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use self::rand::thread_rng_n;

cfg_rt_multi_thread! {
pub(crate) use self::rand::FastRand;

mod try_lock;
pub(crate) use try_lock::TryLock;
}
Expand Down
22 changes: 0 additions & 22 deletions tokio/src/util/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,3 @@ impl FastRand {
s0.wrapping_add(s1)
}
}

tokio_thread_local! {
static THREAD_RNG: FastRand = FastRand::new(RngSeed::new());
}

/// Seeds the thread local random number generator with the provided seed and
/// return the previously stored seed.
///
/// The returned seed can be later used to return the thread local random number
/// generator to its previous state.
#[cfg(feature = "rt")]
pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed {
THREAD_RNG.with(|rng| rng.replace_seed(rng_seed))
}

// Used by the select macro and `StreamMap`
#[cfg(any(feature = "macros"))]
#[doc(hidden)]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub fn thread_rng_n(n: u32) -> u32 {
THREAD_RNG.with(|rng| rng.fastrand_n(n))
}

0 comments on commit df99428

Please sign in to comment.