From 80568dfc6da83f9c68c63bd9de66bcda295c76a9 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 24 Oct 2022 13:59:47 -0700 Subject: [PATCH] rt: misc time driver cleanup (#5120) Removes an unnecessary `Arc` and reduces internal state clones. --- tokio/src/runtime/time/entry.rs | 2 +- tokio/src/runtime/time/handle.rs | 17 +----- tokio/src/runtime/time/mod.rs | 94 ++++++++++++++------------------ tokio/src/runtime/time/source.rs | 2 +- tokio/src/time/sleep.rs | 15 ++--- 5 files changed, 51 insertions(+), 79 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index c4c4ba1b775..dbf778ed05e 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -576,7 +576,7 @@ impl TimerEntry { this.inner().state.poll(cx.waker()) } - fn driver(&self) -> &super::Handle { + pub(crate) fn driver(&self) -> &super::Handle { self.driver.time() } } diff --git a/tokio/src/runtime/time/handle.rs b/tokio/src/runtime/time/handle.rs index 8338f2b5b12..fce791d998c 100644 --- a/tokio/src/runtime/time/handle.rs +++ b/tokio/src/runtime/time/handle.rs @@ -1,31 +1,18 @@ -use crate::loom::sync::Arc; use crate::runtime::time::TimeSource; use std::fmt; /// Handle to time driver instance. -#[derive(Clone)] pub(crate) struct Handle { - time_source: TimeSource, - pub(super) inner: Arc, + pub(super) time_source: TimeSource, + pub(super) inner: super::Inner, } impl Handle { - /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(super) fn new(inner: Arc) -> Self { - let time_source = inner.state.lock().time_source.clone(); - Handle { time_source, inner } - } - /// Returns the time source associated with this handle. pub(crate) fn time_source(&self) -> &TimeSource { &self.time_source } - /// Access the driver's inner structure. - pub(super) fn get(&self) -> &super::Inner { - &*self.inner - } - /// Checks whether the driver has been shutdown. pub(super) fn is_shutdown(&self) -> bool { self.inner.is_shutdown() diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index a935f8cbb42..240f8f16e6d 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -19,7 +19,7 @@ pub(crate) use source::TimeSource; mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Mutex; use crate::runtime::driver::{self, IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; @@ -84,20 +84,8 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; /// [interval]: crate::time::Interval #[derive(Debug)] pub(crate) struct Driver { - /// Timing backend in use. - time_source: TimeSource, - /// Parker to delegate to. park: IoStack, - - // When `true`, a call to `park_timeout` should immediately return and time - // should not advance. One reason for this to be `true` is if the task - // passed to `Runtime::block_on` called `task::yield_now()`. - // - // While it may look racy, it only has any effect when the clock is paused - // and pausing the clock is restricted to a single-threaded runtime. - #[cfg(feature = "test-util")] - did_wake: Arc, } /// Timer state shared between `Driver`, `Handle`, and `Registration`. @@ -108,15 +96,18 @@ struct Inner { /// True if the driver is being shutdown. pub(super) is_shutdown: AtomicBool, + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. #[cfg(feature = "test-util")] - did_wake: Arc, + did_wake: AtomicBool, } /// Time state shared which must be protected by a `Mutex` struct InnerState { - /// Timing backend in use. - time_source: TimeSource, - /// The last published timer `elapsed` value. elapsed: u64, @@ -137,31 +128,23 @@ impl Driver { pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) { let time_source = TimeSource::new(clock); - #[cfg(feature = "test-util")] - let did_wake = Arc::new(AtomicBool::new(false)); - - let inner = Arc::new(Inner { - state: Mutex::new(InnerState { - time_source: time_source.clone(), - elapsed: 0, - next_wake: None, - wheel: wheel::Wheel::new(), - }), - is_shutdown: AtomicBool::new(false), - - #[cfg(feature = "test-util")] - did_wake: did_wake.clone(), - }); - - let handle = Handle::new(inner); - - let driver = Driver { + let handle = Handle { time_source, - park, - #[cfg(feature = "test-util")] - did_wake, + inner: Inner { + state: Mutex::new(InnerState { + elapsed: 0, + next_wake: None, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), + + #[cfg(feature = "test-util")] + did_wake: AtomicBool::new(false), + }, }; + let driver = Driver { park }; + (driver, handle) } @@ -180,7 +163,7 @@ impl Driver { return; } - handle.get().is_shutdown.store(true, Ordering::SeqCst); + handle.inner.is_shutdown.store(true, Ordering::SeqCst); // Advance time forward to the end of time. @@ -191,7 +174,7 @@ impl Driver { fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { let handle = rt_handle.time(); - let mut lock = handle.get().state.lock(); + let mut lock = handle.inner.state.lock(); assert!(!handle.is_shutdown()); @@ -203,11 +186,13 @@ impl Driver { match next_wake { Some(when) => { - let now = self.time_source.now(); + let now = handle.time_source.now(); // Note that we effectively round up to 1ms here - this avoids // very short-duration microsecond-resolution sleeps that the OS // might treat as zero-length. - let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); + let mut duration = handle + .time_source + .tick_to_duration(when.saturating_sub(now)); if duration > Duration::from_millis(0) { if let Some(limit) = limit { @@ -234,7 +219,8 @@ impl Driver { cfg_test_util! { fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { - let clock = &self.time_source.clock; + let handle = rt_handle.time(); + let clock = &handle.time_source.clock; if clock.is_paused() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); @@ -243,7 +229,7 @@ impl Driver { // before the "duration" elapsed (usually caused by a // yield in `Runtime::block_on`). In this case, we don't // advance the clock. - if !self.did_wake() { + if !handle.did_wake() { // Simulate advancing time clock.advance(duration); } @@ -251,10 +237,6 @@ impl Driver { self.park.park_timeout(rt_handle, duration); } } - - fn did_wake(&self) -> bool { - self.did_wake.swap(false, Ordering::SeqCst) - } } cfg_not_test_util! { @@ -276,7 +258,7 @@ impl Handle { let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; - let mut lock = self.get().lock(); + let mut lock = self.inner.lock(); if now < lock.elapsed { // Time went backwards! This normally shouldn't happen as the Rust language @@ -307,7 +289,7 @@ impl Handle { waker_idx = 0; - lock = self.get().lock(); + lock = self.inner.lock(); } } } @@ -338,7 +320,7 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.get().lock(); + let mut lock = self.inner.lock(); if entry.as_ref().might_be_registered() { lock.wheel.remove(entry); @@ -361,7 +343,7 @@ impl Handle { entry: NonNull, ) { let waker = unsafe { - let mut lock = self.get().lock(); + let mut lock = self.inner.lock(); // We may have raced with a firing/deregistration, so check before // deregistering. @@ -408,6 +390,12 @@ impl Handle { waker.wake(); } } + + cfg_test_util! { + fn did_wake(&self) -> bool { + self.inner.did_wake.swap(false, Ordering::SeqCst) + } + } } // ===== impl Inner ===== diff --git a/tokio/src/runtime/time/source.rs b/tokio/src/runtime/time/source.rs index 1cdb86891d6..e6788edcaf8 100644 --- a/tokio/src/runtime/time/source.rs +++ b/tokio/src/runtime/time/source.rs @@ -3,7 +3,7 @@ use crate::time::{Clock, Duration, Instant}; use std::convert::TryInto; /// A structure which handles conversion from Instants to u64 timestamps. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct TimeSource { pub(crate) clock: Clock, start_time: Instant, diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index b2e77515982..306b45d582c 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,5 +1,3 @@ -#[cfg(all(tokio_unstable, feature = "tracing"))] -use crate::runtime::time::TimeSource; use crate::runtime::time::TimerEntry; use crate::time::{error::Error, Duration, Instant}; use crate::util::trace; @@ -239,7 +237,6 @@ cfg_trace! { struct Inner { deadline: Instant, ctx: trace::AsyncOpTracingCtx, - time_source: TimeSource, } } @@ -266,7 +263,7 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { let handle = &handle.time(); - let time_source = handle.time_source().clone(); + let time_source = handle.time_source(); let deadline_tick = time_source.deadline_to_tick(deadline); let duration = deadline_tick.saturating_sub(time_source.now()); @@ -303,7 +300,6 @@ impl Sleep { Inner { deadline, ctx, - time_source, } }; @@ -374,8 +370,8 @@ impl Sleep { } fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { - let me = self.project(); - me.entry.reset(deadline); + let mut me = self.project(); + me.entry.as_mut().reset(deadline); (*me.inner).deadline = deadline; #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -389,8 +385,9 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let now = me.inner.time_source.now(); - let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + let time_source = me.entry.driver().time_source(); + let now = time_source.now(); + let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) };