From 95fdb294de38c62bd8cca90d64eaf66f988ff444 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Tue, 5 Dec 2023 19:58:28 +0000 Subject: [PATCH] runtime: skip notified tasks during taskdumps Fixes #6051. --- tokio/src/runtime/task/mod.rs | 12 ++++-- tokio/src/runtime/task/state.rs | 14 ++++--- tokio/src/runtime/task/trace/mod.rs | 31 +++++++++----- tokio/tests/dump.rs | 63 +++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 19 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index d7fde0fe67d..70b160d4dda 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -363,10 +363,14 @@ impl Task { } cfg_taskdump! { - pub(super) fn notify_for_tracing(&self) -> Notified { - self.as_raw().state().transition_to_notified_for_tracing(); - // SAFETY: `transition_to_notified_for_tracing` increments the refcount. - unsafe { Notified(Task::new(self.raw)) } + pub(super) fn notify_for_tracing(&self) -> Option> { + if self.as_raw().state().transition_to_notified_for_tracing() { + // SAFETY: `transition_to_notified_for_tracing` increments the + // refcount. + Some(unsafe { Notified(Task::new(self.raw)) }) + } else { + None + } } } } diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 64cfb4b5db1..3f20c086cb0 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -278,12 +278,16 @@ impl State { target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] - pub(super) fn transition_to_notified_for_tracing(&self) { + pub(super) fn transition_to_notified_for_tracing(&self) -> bool { self.fetch_update_action(|mut snapshot| { - snapshot.set_notified(); - snapshot.ref_inc(); - ((), Some(snapshot)) - }); + if snapshot.is_notified() { + (false, None) + } else { + snapshot.set_notified(); + snapshot.ref_inc(); + (true, Some(snapshot)) + } + }) } /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 185d682a47c..1e143373c8c 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread( injection: &Inject>, ) -> Vec { // clear the local and injection queues - local.clear(); + + let mut dequeued = Vec::new(); + + while let Some(task) = local.pop_back() { + dequeued.push(task); + } while let Some(task) = injection.pop() { - drop(task); + dequeued.push(task); } // precondition: We have drained the tasks from the injection queue. - trace_owned(owned) + trace_owned(owned, dequeued) } cfg_rt_multi_thread! { @@ -299,22 +304,24 @@ cfg_rt_multi_thread! { synced: &Mutex, injection: &Shared>, ) -> Vec { + let mut dequeued = Vec::new(); + // clear the local queue while let Some(notified) = local.pop() { - drop(notified); + dequeued.push(notified); } // clear the injection queue let mut synced = synced.lock(); while let Some(notified) = injection.pop(&mut synced.inject) { - drop(notified); + dequeued.push(notified); } drop(synced); // precondition: we have drained the tasks from the local and injection // queues. - trace_owned(owned) + trace_owned(owned, dequeued) } } @@ -324,12 +331,16 @@ cfg_rt_multi_thread! { /// /// This helper presumes exclusive access to each task. The tasks must not exist /// in any other queue. -fn trace_owned(owned: &OwnedTasks) -> Vec { +fn trace_owned(owned: &OwnedTasks, dequeued: Vec>) -> Vec { // notify each task - let mut tasks = vec![]; + let mut tasks = dequeued; owned.for_each(|task| { - // notify the task (and thus make it poll-able) and stash it - tasks.push(task.notify_for_tracing()); + // Notify the task (and thus make it poll-able) and stash it. This fails + // if the task is already notified. In these cases, we skip tracing the + // task. + if let Some(notified) = task.notify_for_tracing() { + tasks.push(notified); + } // we do not poll it here since we hold a lock on `owned` and the task // may complete and need to remove itself from `owned`. }); diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 4da0c9e8e18..402560397ca 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -147,6 +147,7 @@ mod future_completes_during_trace { async fn dump() { let handle = Handle::current(); let _dump = handle.dump().await; + tokio::task::yield_now().await; } rt.block_on(async { @@ -154,3 +155,65 @@ mod future_completes_during_trace { }); } } + +/// Regression tests for #6051. +/// +/// These tests ensure that tasks notified outside of a worker will not be +/// traced, since doing so will un-set their notified bit prior to them being +/// run and panic. +mod notified_during_tracing { + use super::*; + + fn test(rt: tokio::runtime::Runtime) { + async fn dump() { + loop { + let handle = Handle::current(); + let _dump = handle.dump().await; + // TODO: This tests hangs with this commented out. Why? + // tokio::task::yield_now().await; + } + } + + rt.block_on(async { + let timer = tokio::spawn(async { + loop { + tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await; + } + }); + + let timeout = async { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + }; + + tokio::select!( + biased; + _ = timeout => {}, + _ = timer => {}, + _ = dump() => {}, + ); + }); + } + + #[test] + // TODO: This currently hangs, with or without the regression fix. Adding a + // `yield_now` to the end of `fn dump()` above fixes the issue, but why? + fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + test(rt) + } + + #[test] + fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(3) + .build() + .unwrap(); + + test(rt) + } +}