From 1e5f26af65e7cbf6265805b56938ba1b231da21c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 31 Oct 2022 15:30:14 -0700 Subject: [PATCH] rt: move `coop` mod into `runtime` This is a step towards unifying thread-local variables. In the future, `coop` will be updated to use the runtime context thread-local to store its state. --- tokio/src/io/util/empty.rs | 2 +- tokio/src/io/util/mem.rs | 4 ++-- tokio/src/lib.rs | 9 --------- tokio/src/park/thread.rs | 2 +- tokio/src/process/mod.rs | 2 +- tokio/src/runtime/blocking/task.rs | 2 +- tokio/src/{ => runtime}/coop.rs | 0 tokio/src/runtime/enter.rs | 2 +- tokio/src/runtime/io/registration.rs | 2 +- tokio/src/runtime/mod.rs | 2 ++ tokio/src/runtime/scheduler/current_thread.rs | 4 ++-- tokio/src/runtime/scheduler/multi_thread/worker.rs | 3 +-- tokio/src/runtime/task/join.rs | 2 +- tokio/src/sync/batch_semaphore.rs | 4 ++-- tokio/src/sync/mpsc/chan.rs | 2 +- tokio/src/sync/oneshot.rs | 4 ++-- tokio/src/task/consume_budget.rs | 2 +- tokio/src/task/local.rs | 4 ++-- tokio/src/task/unconstrained.rs | 2 +- tokio/src/time/sleep.rs | 4 ++-- tokio/src/time/timeout.rs | 2 +- 21 files changed, 26 insertions(+), 34 deletions(-) rename tokio/src/{ => runtime}/coop.rs (100%) diff --git a/tokio/src/io/util/empty.rs b/tokio/src/io/util/empty.rs index 77db60e40b4..9e648f87e62 100644 --- a/tokio/src/io/util/empty.rs +++ b/tokio/src/io/util/empty.rs @@ -77,7 +77,7 @@ impl fmt::Debug for Empty { cfg_coop! { fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> { - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); coop.made_progress(); Poll::Ready(()) } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 4019db56ff4..31884b39614 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -233,7 +233,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_read_internal(cx, buf); if ret.is_ready() { @@ -261,7 +261,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_write_internal(cx, buf); if ret.is_ready() { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index d60c80ceac0..95947148ef3 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -497,18 +497,9 @@ cfg_rt! { pub mod runtime; } cfg_not_rt! { - #[cfg(any( - feature = "macros", - feature = "net", - feature = "time", - all(unix, feature = "process"), - all(unix, feature = "signal"), - ))] pub(crate) mod runtime; } -pub(crate) mod coop; - cfg_signal! { pub mod signal; } diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index b0c3cc6df2b..9dfbfae0951 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -269,7 +269,7 @@ impl CachedParkThread { pin!(f); loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 2e2507e7792..7e1e75d3112 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -954,7 +954,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = Pin::new(&mut self.inner).poll(cx); diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index 0b7803a6c0b..c4461754005 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -37,7 +37,7 @@ where // currently goes through Task::poll(), and so is subject to budgeting. That isn't really // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so // we want it to start without any budgeting. - crate::coop::stop(); + crate::runtime::coop::stop(); Poll::Ready(func()) } diff --git a/tokio/src/coop.rs b/tokio/src/runtime/coop.rs similarity index 100% rename from tokio/src/coop.rs rename to tokio/src/runtime/coop.rs diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index be860711b03..fec5887eb84 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -173,7 +173,7 @@ cfg_rt! { let when = Instant::now() + timeout; loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index a0f2a349289..7b95f7f0409 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -145,7 +145,7 @@ impl Registration { direction: Direction, ) -> Poll> { // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); if self.handle().is_shutdown() { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 0ec9d683a5e..1072274bf4b 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -180,6 +180,8 @@ mod tests; #[cfg(any(feature = "rt", feature = "macros"))] pub(crate) mod context; +pub(crate) mod coop; + mod driver; pub(crate) mod scheduler; diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index d5124d87383..778f93a84c5 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -290,7 +290,7 @@ impl Context { /// thread-local context. fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { core.metrics.incr_poll_count(); - self.enter(core, || crate::coop::budget(f)) + self.enter(core, || crate::runtime::coop::budget(f)) } /// Blocks the current thread until an event is received by the driver, @@ -533,7 +533,7 @@ impl CoreGuard<'_> { if handle.reset_woken() { let (c, res) = context.enter(core, || { - crate::coop::budget(|| future.as_mut().poll(&mut cx)) + crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx)) }); core = c; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 1e489b7e5c0..b21a11923d9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -56,14 +56,13 @@ //! the inject queue indefinitely. This would be a ref-count cycle and a memory //! leak. -use crate::coop; use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker}; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ - blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, + blocking, coop, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index b2138b14596..31f6a6f8748 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -295,7 +295,7 @@ impl Future for JoinHandle { let mut ret = Poll::Pending; // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); // Raw should always be set. If it is not, this is due to polling after // completion diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 23e6e2adfb2..101c59368f0 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -540,11 +540,11 @@ impl Future for Acquire<'_> { #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( "poll_acquire", - crate::coop::poll_proceed(cx), + crate::runtime::coop::poll_proceed(cx), )); #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let result = match semaphore.poll_acquire(cx, needed, node, *queued) { Pending => { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 076d925d62f..92f057b068f 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -243,7 +243,7 @@ impl Rx { use super::block::Read::*; // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 07b39d077b7..1ceeab1b03b 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -785,7 +785,7 @@ impl Sender { /// ``` pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let inner = self.inner.as_ref().unwrap(); @@ -1124,7 +1124,7 @@ impl Inner { fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); // Load the state let mut state = State::load(&self.state, Acquire); diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/consume_budget.rs index 7c46444b7ea..1212cfccd77 100644 --- a/tokio/src/task/consume_budget.rs +++ b/tokio/src/task/consume_budget.rs @@ -36,7 +36,7 @@ pub async fn consume_budget() { if status.is_ready() { return status; } - status = crate::coop::poll_proceed(cx).map(|restore| { + status = crate::runtime::coop::poll_proceed(cx).map(|restore| { restore.made_progress(); }); status diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 739d0623b5c..17935831708 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -607,7 +607,7 @@ impl LocalSet { // task initially. Because `LocalSet` itself is `!Send`, and // `spawn_local` spawns into the `LocalSet` on the current // thread, the invariant is maintained. - Some(task) => crate::coop::budget(|| task.run()), + Some(task) => crate::runtime::coop::budget(|| task.run()), // We have fully drained the queue of notified tasks, so the // local future doesn't need to be notified again — it can wait // until something else wakes a task in the local set. @@ -893,7 +893,7 @@ impl Future for RunUntil<'_, T> { let _no_blocking = crate::runtime::enter::disallow_block_in_place(); let f = me.future; - if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + if let Poll::Ready(output) = crate::runtime::coop::budget(|| f.poll(cx)) { return Poll::Ready(output); } diff --git a/tokio/src/task/unconstrained.rs b/tokio/src/task/unconstrained.rs index 31c732bfc95..40384c8709e 100644 --- a/tokio/src/task/unconstrained.rs +++ b/tokio/src/task/unconstrained.rs @@ -22,7 +22,7 @@ where cfg_coop! { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inner = self.project().inner; - crate::coop::with_unconstrained(|| inner.poll(cx)) + crate::runtime::coop::with_unconstrained(|| inner.poll(cx)) } } diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index a20651373de..d974e1ab282 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -392,11 +392,11 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( "poll_elapsed", - crate::coop::poll_proceed(cx), + crate::runtime::coop::poll_proceed(cx), )); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let coop = ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let result = me.entry.poll_elapsed(cx).map(move |r| { coop.made_progress(); diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index c6adfd935b7..3bb98ea6f92 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -5,7 +5,7 @@ //! [`Timeout`]: struct@Timeout use crate::{ - coop, + runtime::coop, time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, util::trace, };