diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 12c3c813656..17ae2a0ee94 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -43,6 +43,9 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +nix = "0.21" +rand = "0.8" +libc = "0.2" [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", optional = true } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 22ad92b8c4b..435a925526a 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,6 +51,8 @@ cfg_time! { pub mod time; } +pub mod lrtd; + pub mod sync; pub mod either; diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs new file mode 100644 index 00000000000..9159cbd385a --- /dev/null +++ b/tokio-util/src/lrtd/lrtd.rs @@ -0,0 +1,194 @@ +use libc; +use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet, Signal}; +use rand::Rng; +use std::backtrace::Backtrace; +use std::collections::HashSet; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{env, thread}; +use tokio::runtime::{Builder, Runtime}; + +const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); + +fn get_panic_worker_block_duration() -> Duration { + let duration_str = env::var("MY_DURATION_ENV").unwrap_or_else(|_| "60".to_string()); + duration_str + .parse::() + .map(Duration::from_secs) + .unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT) +} + +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + +#[derive(Debug)] +struct WorkerSet { + inner: Mutex>, +} + +impl WorkerSet { + fn new() -> Self { + WorkerSet { + inner: Mutex::new(HashSet::new()), + } + } + + fn add(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.insert(pid); + } + + fn remove(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.remove(&pid); + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + +extern "C" fn signal_handler(_: i32) { + let thread = thread::current(); + let name = thread + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + let trace = Backtrace::force_capture(); + eprintln!("Stack trace{}:{}\n{}", name, get_thread_id(), trace); +} + +pub fn install_thread_stack_stace_handler(signal: Signal) { + let mut sigset = SigSet::empty(); + sigset.add(signal); + + // Set up a signal action + let sa = SigAction::new( + SigHandler::Handler(signal_handler), + SaFlags::empty(), + sigset, + ); + + // Register the signal action for process + unsafe { + signal::sigaction(signal, &sa).expect("Failed to register signal handler"); + } +} + +pub fn install_thread_stack_stace_handler_default() { + let default_signal = Signal::SIGUSR1; + install_thread_stack_stace_handler(default_signal); +} + +fn signal_all_threads(signal: Signal, targets: Vec) { + for thread_id in &targets { + let result = unsafe { + libc::pthread_kill( + *thread_id, + match signal.into() { + Some(s) => s as libc::c_int, + None => 0, + }, + ) + }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } +} + +#[derive(Debug)] +pub struct LongRunningTaskDetector { + interval: Duration, + detection_time: Duration, + stop_flag: Arc>, + workers: Arc, + signal: Signal, +} + +async fn do_nothing(tx: mpsc::Sender<()>) { + // signal I am done + let _ = tx.send(()).unwrap(); +} + +fn probe( + tokio_runtime: &Arc, + detection_time: Duration, + signal: Signal, + workers: &Arc, +) { + let (tx, rx) = mpsc::channel(); + let _nothing_handle = tokio_runtime.spawn(do_nothing(tx)); + let is_probe_success = match rx.recv_timeout(detection_time) { + Ok(_result) => true, + Err(_) => false, + }; + if !is_probe_success { + let targets = workers.get_all(); + eprintln!( + "Detected worker blocking, signaling worker threads: {:?}", + targets + ); + signal_all_threads(signal, targets); + // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. + let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); + } +} + +impl LongRunningTaskDetector { + pub fn new( + interval: Duration, + detection_time: Duration, + signal: Signal, + runtime_builder: &mut Builder, + ) -> Self { + let workers = Arc::new(WorkerSet::new()); + if runtime_builder.is_current_threaded() { + workers.add(get_thread_id()); + } else { + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder + .on_thread_start(move || { + let pid = get_thread_id(); + workers_clone.add(pid); + }) + .on_thread_stop(move || { + let pid = get_thread_id(); + workers_clone2.remove(pid); + }); + } + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(false)), + workers, + signal, + } + } + + pub fn start(&self, runtime: Arc) { + let stop_flag = Arc::clone(&self.stop_flag); + let detection_time = self.detection_time.clone(); + let interval = self.interval.clone(); + let signal = self.signal.clone(); + let tokio_runtime = runtime.clone(); + let workers = Arc::clone(&self.workers); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + while !*stop_flag.lock().unwrap() { + probe(&tokio_runtime, detection_time, signal, &workers); + thread::sleep(Duration::from_micros( + rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), + )); + } + }); + } + + pub fn stop(&self) { + *self.stop_flag.lock().unwrap() = true; + } +} diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs new file mode 100644 index 00000000000..be4b0832b10 --- /dev/null +++ b/tokio-util/src/lrtd/mod.rs @@ -0,0 +1,5 @@ +mod lrtd; + +pub use self::lrtd::install_thread_stack_stace_handler; +pub use self::lrtd::install_thread_stack_stace_handler_default; +pub use self::lrtd::LongRunningTaskDetector; \ No newline at end of file diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs new file mode 100644 index 00000000000..f8c4c8bbf36 --- /dev/null +++ b/tokio-util/tests/lrtd.rs @@ -0,0 +1,63 @@ +use nix::sys::signal::Signal; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use tokio_util::lrtd::{install_thread_stack_stace_handler, LongRunningTaskDetector}; + +async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(2)); + println!("slow done"); +} + +fn install_thread_stack_stace_handler_default() { + install_thread_stack_stace_handler(Signal::SIGUSR1); +} + +#[test] +fn test_blocking_detection_multi() { + install_thread_stack_stace_handler_default(); + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime.clone()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("Hello world"); + }); + lrtd.stop() +} + +#[test] +fn test_blocking_detection_current() { + install_thread_stack_stace_handler_default(); + let mut builder = tokio::runtime::Builder::new_current_thread(); + let mutable_builder = builder.enable_all(); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = mutable_builder.build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + println!("Sleeping"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Hello world"); + }); + lrtd.stop() +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 78e6bf50d62..f4eeb775984 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -318,6 +318,17 @@ impl Builder { } } + /// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise. + pub fn is_current_threaded(&self) -> bool { + match &self.kind { + Kind::CurrentThread => true, + #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + Kind::MultiThread => false, + #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + Kind::MultiThreadAlt => false, + } + } + /// Enables both I/O and time drivers. /// /// Doing this is a shorthand for calling `enable_io` and `enable_time`