Skip to content

Commit

Permalink
rt: create driver::Handle struct (#5018)
Browse files Browse the repository at this point in the history
Move all individual driver handles into a single `driver::Handle`
struct.
  • Loading branch information
carllerche authored Sep 15, 2022
1 parent 198e2d8 commit 9e02759
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 115 deletions.
14 changes: 4 additions & 10 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ impl Builder {
use crate::runtime::{Config, CurrentThread, HandleInner, Scheduler};
use std::sync::Arc;

let (driver, resources) = driver::Driver::new(self.get_cfg())?;
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
Expand All @@ -861,10 +861,7 @@ impl Builder {

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
driver: driver_handle,
blocking_spawner,
});

Expand Down Expand Up @@ -957,7 +954,7 @@ cfg_rt_multi_thread! {

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, resources) = driver::Driver::new(self.get_cfg())?;
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool =
Expand All @@ -981,10 +978,7 @@ cfg_rt_multi_thread! {

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
driver: driver_handle,
blocking_spawner,
});

Expand Down
22 changes: 19 additions & 3 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone()
ctx.as_ref()
.expect(crate::util::error::CONTEXT_MISSING_ERROR)
.as_inner()
.driver
.io
.clone()
}) {
Ok(io_handle) => io_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -41,7 +46,12 @@ cfg_signal_internal! {
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone()
ctx.as_ref()
.expect(crate::util::error::CONTEXT_MISSING_ERROR)
.as_inner()
.driver
.signal
.clone()
}) {
Ok(signal_handle) => signal_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -52,7 +62,13 @@ cfg_signal_internal! {
cfg_time! {
cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref()
.map(|ctx| {
ctx.as_inner().driver.clock.clone()
})
}) {
Ok(clock) => clock,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
Expand Down
128 changes: 68 additions & 60 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,74 @@ use crate::park::thread::{ParkThread, UnparkThread};
use std::io;
use std::time::Duration;

#[derive(Debug)]
pub(crate) struct Driver {
inner: TimeDriver,
}

#[derive(Debug)]
pub(crate) struct Handle {
/// IO driver handle
pub(crate) io: IoHandle,

/// Signal driver handle
#[cfg_attr(any(not(unix), loom), allow(dead_code))]
pub(crate) signal: SignalHandle,

/// Time driver handle
pub(crate) time: TimeHandle,

/// Source of `Instant::now()`
#[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
pub(crate) clock: Clock,
}

pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
}

pub(crate) type Unpark = TimerUnpark;

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());

Ok((
Self { inner: time_driver },
Handle {
io: io_handle,
signal: signal_handle,
time: time_handle,
clock,
},
))
}

pub(crate) fn unpark(&self) -> TimerUnpark {
self.inner.unpark()
}

pub(crate) fn park(&mut self) {
self.inner.park()
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.inner.park_timeout(duration)
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown()
}
}

// ===== io driver =====

cfg_io_driver! {
Expand Down Expand Up @@ -266,63 +334,3 @@ cfg_not_time! {
(io_stack, ())
}
}

// ===== runtime driver =====

#[derive(Debug)]
pub(crate) struct Driver {
inner: TimeDriver,
}

pub(crate) type Unpark = TimerUnpark;

pub(crate) struct Resources {
pub(crate) io_handle: IoHandle,
pub(crate) signal_handle: SignalHandle,
pub(crate) time_handle: TimeHandle,
pub(crate) clock: Clock,
}

pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
}

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());

Ok((
Self { inner: time_driver },
Resources {
io_handle,
signal_handle,
time_handle,
clock,
},
))
}

pub(crate) fn unpark(&self) -> TimerUnpark {
self.inner.unpark()
}

pub(crate) fn park(&mut self) {
self.inner.park()
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.inner.park_timeout(duration)
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown()
}
}
34 changes: 4 additions & 30 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,9 @@ pub(crate) struct HandleInner {
#[cfg(feature = "rt")]
pub(super) spawner: Spawner,

/// Handles to the I/O drivers
#[cfg_attr(
not(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)),
allow(dead_code)
)]
pub(super) io_handle: driver::IoHandle,

/// Handles to the signal drivers
#[cfg_attr(
any(
loom,
not(all(unix, feature = "signal")),
not(all(unix, feature = "process")),
),
allow(dead_code)
)]
pub(super) signal_handle: driver::SignalHandle,

/// Handles to the time drivers
#[cfg_attr(not(feature = "time"), allow(dead_code))]
pub(super) time_handle: driver::TimeHandle,

/// Source of `Instant::now()`
#[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
pub(super) clock: driver::Clock,
/// Resource driver handles
#[cfg_attr(not(feature = "full"), allow(dead_code))]
pub(super) driver: driver::Handle,

/// Blocking pool spawner
#[cfg(feature = "rt")]
Expand Down Expand Up @@ -414,7 +388,7 @@ cfg_time! {
impl Handle {
#[track_caller]
pub(crate) fn as_time_handle(&self) -> &crate::runtime::time::Handle {
self.inner.time_handle.as_ref()
self.inner.driver.time.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ cfg_net! {
// thus this breaks that guarantee.
self.handle
.as_inner()
.io_handle
.driver
.io
.as_ref()
.map(|h| f(h.metrics()))
.unwrap_or(0)
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl TimerEntry {

unsafe {
self.driver()
.reregister(&self.driver.inner.io_handle, tick, self.inner().into());
.reregister(&self.driver.inner.driver.io, tick, self.inner().into());
}
}

Expand All @@ -573,7 +573,7 @@ impl TimerEntry {

fn driver(&self) -> &super::Handle {
// At this point, we know the time_handle is Some.
self.driver.inner.time_handle.as_ref().unwrap()
self.driver.inner.driver.time.as_ref().unwrap()
}
}

Expand Down
24 changes: 15 additions & 9 deletions tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ fn single_timer() {

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
let entry = TimerEntry::new(
&handle_,
handle_.inner.driver.clock.now() + Duration::from_secs(1),
);
pin!(entry);

block_on(futures::future::poll_fn(|cx| {
Expand Down Expand Up @@ -79,8 +81,10 @@ fn drop_timer() {

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
let entry = TimerEntry::new(
&handle_,
handle_.inner.driver.clock.now() + Duration::from_secs(1),
);
pin!(entry);

let _ = entry
Expand Down Expand Up @@ -110,8 +114,10 @@ fn change_waker() {

let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
let entry = TimerEntry::new(
&handle_,
handle_.inner.driver.clock.now() + Duration::from_secs(1),
);
pin!(entry);

let _ = entry
Expand Down Expand Up @@ -145,7 +151,7 @@ fn reset_future() {

let handle_ = handle.clone();
let finished_early_ = finished_early.clone();
let start = handle.inner.clock.now();
let start = handle.inner.driver.clock.now();

let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
Expand Down Expand Up @@ -211,7 +217,7 @@ fn poll_process_levels() {
for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle,
handle.inner.clock.now() + Duration::from_millis(i),
handle.inner.driver.clock.now() + Duration::from_millis(i),
));

let _ = entry
Expand Down Expand Up @@ -245,7 +251,7 @@ fn poll_process_levels_targeted() {

let e1 = TimerEntry::new(
&handle,
handle.inner.clock.now() + Duration::from_millis(193),
handle.inner.driver.clock.now() + Duration::from_millis(193),
);
pin!(e1);

Expand Down

0 comments on commit 9e02759

Please sign in to comment.