Skip to content

Commit

Permalink
rt: add rng_seed option to runtime::Builder (#4910)
Browse files Browse the repository at this point in the history
The `tokio::select!` macro polls branches in a random order. While this
is desirable in production, for testing purposes a more deterministic
approach can be useul.

This change adds an additional parameter `rng_seed` to the runtime
`Builder` to set the random number generator seed. This value is then
used to reset the seed on the current thread when the runtime is entered
into (restoring the previous value when the thread leaves the runtime). All
threads created explicitly by the runtime also have a seed set as the
runtime is built. Each thread is set with a seed from a deterministic
sequence.

This guarantees that calls to the `tokio::select!` macro which are
performed in the same order on the same thread will poll branches in the
same order.

Additionally, the peer chosen to attempt to steal work from also uses a
deterministic sequence if `rng_seed` is set.

Both the builder parameter as well as the `RngSeed` struct are marked
unstable initially.
  • Loading branch information
hds authored Sep 16, 2022
1 parent 9e02759 commit b5709ba
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 31 deletions.
7 changes: 7 additions & 0 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::util::{replace_thread_rng, RngSeedGenerator};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -48,6 +49,9 @@ struct Inner {

// Customizable wait timeout.
keep_alive: Duration,

// Random number seed
seed_generator: RngSeedGenerator,
}

struct Shared {
Expand Down Expand Up @@ -182,6 +186,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
seed_generator: builder.seed_generator.next_generator(),
}),
},
shutdown_rx,
Expand Down Expand Up @@ -431,6 +436,8 @@ impl Inner {
if let Some(f) = &self.after_start {
f()
}
// We own this thread so there is no need to replace the RngSeed once we're done.
let _ = replace_thread_rng(self.seed_generator.next_seed());

let mut shared = self.shared.lock();
let mut join_on_thread = None;
Expand Down
46 changes: 46 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, Runtime, Spawner};
use crate::util::{RngSeed, RngSeedGenerator};

use std::fmt;
use std::io;
Expand Down Expand Up @@ -90,6 +91,9 @@ pub struct Builder {
/// This option should only be exposed as unstable.
pub(super) disable_lifo_slot: bool,

/// Specify a random number generator seed to provide deterministic results
pub(super) seed_generator: RngSeedGenerator,

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,
}
Expand Down Expand Up @@ -255,6 +259,8 @@ impl Builder {
global_queue_interval,
event_interval,

seed_generator: RngSeedGenerator::new(RngSeed::new()),

#[cfg(tokio_unstable)]
unhandled_panic: UnhandledPanic::Ignore,

Expand Down Expand Up @@ -829,6 +835,42 @@ impl Builder {
self.disable_lifo_slot = true;
self
}

/// Specifies the random number generation seed to use within all threads associated
/// with the runtime being built.
///
/// This option is intended to make certain parts of the runtime deterministic.
/// Specifically, it affects the [`tokio::select!`] macro and the work stealing
/// algorithm. In the case of [`tokio::select!`] it will ensure that the order that
/// branches are polled is deterministic.
///
/// In the case of work stealing, it's a little more complicated. Each worker will
/// be given a deterministic seed so that the starting peer for each work stealing
/// search will be deterministic.
///
/// In addition to the code specifying `rng_seed` and interacting with the runtime,
/// the internals of Tokio and the Rust compiler may affect the sequences of random
/// numbers. In order to ensure repeatable results, the version of Tokio, the versions
/// of all other dependencies that interact with Tokio, and the Rust compiler version
/// should also all remain constant.
///
/// # Examples
///
/// ```
/// # use tokio::runtime::{self, RngSeed};
/// # pub fn main() {
/// let seed = RngSeed::from_bytes(b"place your seed here");
/// let rt = runtime::Builder::new_current_thread()
/// .rng_seed(seed)
/// .build();
/// # }
/// ```
///
/// [`tokio::select!`]: crate::select
pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
self.seed_generator = RngSeedGenerator::new(seed);
self
}
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
Expand All @@ -855,6 +897,7 @@ impl Builder {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: self.seed_generator.next_generator(),
},
);
let spawner = Spawner::CurrentThread(scheduler.spawner().clone());
Expand All @@ -863,6 +906,7 @@ impl Builder {
spawner,
driver: driver_handle,
blocking_spawner,
seed_generator: self.seed_generator.next_generator(),
});

Ok(Runtime {
Expand Down Expand Up @@ -972,6 +1016,7 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: self.seed_generator.next_generator(),
},
);
let spawner = Spawner::MultiThread(scheduler.spawner().clone());
Expand All @@ -980,6 +1025,7 @@ cfg_rt_multi_thread! {
spawner,
driver: driver_handle,
blocking_spawner,
seed_generator: self.seed_generator.next_generator(),
});

// Create the runtime handle
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(any(not(feature = "full"), tokio_wasm), allow(dead_code))]
use crate::runtime::Callback;
use crate::util::RngSeedGenerator;

pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
Expand All @@ -23,6 +24,10 @@ pub(crate) struct Config {
/// stop-gap, this unstable option lets users disable the LIFO task.
pub(crate) disable_lifo_slot: bool,

/// Random number generator seed to configure runtimes to act in a
/// deterministic way.
pub(crate) seed_generator: RngSeedGenerator,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
Expand Down
25 changes: 17 additions & 8 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Thread local runtime context
use crate::runtime::{Handle, TryCurrentError};
use crate::util::{replace_thread_rng, RngSeed};

use std::cell::RefCell;

Expand Down Expand Up @@ -99,21 +100,29 @@ pub(crate) fn enter(new: Handle) -> EnterGuard {
///
/// [`Handle`]: Handle
pub(crate) fn try_enter(new: Handle) -> Option<EnterGuard> {
CONTEXT
.try_with(|ctx| {
let old = ctx.borrow_mut().replace(new);
EnterGuard(old)
})
.ok()
let rng_seed = new.as_inner().seed_generator.next_seed();
let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?;

let old_seed = replace_thread_rng(rng_seed);

Some(EnterGuard {
old_handle,
old_seed,
})
}

#[derive(Debug)]
pub(crate) struct EnterGuard(Option<Handle>);
pub(crate) struct EnterGuard {
old_handle: Option<Handle>,
old_seed: RngSeed,
}

impl Drop for EnterGuard {
fn drop(&mut self) {
CONTEXT.with(|ctx| {
*ctx.borrow_mut() = self.0.take();
*ctx.borrow_mut() = self.old_handle.take();
});
// We discard the RngSeed associated with this guard
let _ = replace_thread_rng(self.old_seed.clone());
}
}
7 changes: 7 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

use crate::runtime::driver;

#[cfg(feature = "rt")]
use crate::util::RngSeedGenerator;

use std::sync::Arc;

/// Handle to the runtime.
Expand Down Expand Up @@ -32,6 +35,10 @@ pub(crate) struct HandleInner {
/// Blocking pool spawner
#[cfg(feature = "rt")]
pub(crate) blocking_spawner: blocking::Spawner,

/// Current random number generator seed
#[cfg(feature = "rt")]
pub(super) seed_generator: RngSeedGenerator,
}

cfg_rt! {
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ cfg_rt! {
pub use self::builder::Builder;
cfg_unstable! {
pub use self::builder::UnhandledPanic;
pub use crate::util::RngSeed;
}

pub(crate) mod context;
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
use crate::coop;
use crate::future::Future;
use crate::loom::rand::seed;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::enter::EnterContext;
Expand Down Expand Up @@ -206,7 +205,7 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc<Shared>,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
rand: FastRand::new(config.seed_generator.next_seed()),
}));

remotes.push(Remote { steal, unpark });
Expand Down
16 changes: 11 additions & 5 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ pub(crate) use wake_list::WakeList;
))]
pub(crate) mod linked_list;

#[cfg(any(feature = "rt-multi-thread", feature = "macros"))]
#[cfg(any(feature = "rt", feature = "macros"))]
mod rand;

cfg_rt! {
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;

pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng};

mod wake;
pub(crate) use wake::WakerRef;
pub(crate) use wake::{waker_ref, Wake};
Expand All @@ -61,6 +63,14 @@ cfg_rt! {
pub(crate) use rc_cell::RcCell;
}

#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[cfg(feature = "rt")]
pub use self::rand::RngSeed;

#[cfg(any(feature = "macros"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use self::rand::thread_rng_n;

cfg_rt_multi_thread! {
pub(crate) use self::rand::FastRand;

Expand All @@ -70,8 +80,4 @@ cfg_rt_multi_thread! {

pub(crate) mod trace;

#[cfg(any(feature = "macros"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use self::rand::thread_rng_n;

pub(crate) mod error;
Loading

0 comments on commit b5709ba

Please sign in to comment.