diff --git a/examples/Cargo.toml b/examples/Cargo.toml index c3dc6091518..fe3c90f9a56 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -7,7 +7,9 @@ edition = "2018" # If you copy one of the examples into a new project, you should be using # [dependencies] instead. [dev-dependencies] -tokio = { version = "0.2.0", path = "../tokio", features = ["full"] } +tokio = { version = "0.2.0", path = "../tokio", features = ["full", "tracing"] } +tracing = "0.1" +tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] } tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] } bytes = "0.5" futures = "0.3.0" diff --git a/examples/chat.rs b/examples/chat.rs index b3fb727a2cc..c4b8c6a2afc 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -43,6 +43,26 @@ use std::task::{Context, Poll}; #[tokio::main] async fn main() -> Result<(), Box> { + use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; + // Configure a `tracing` subscriber that logs traces emitted by the chat + // server. + tracing_subscriber::fmt() + // Filter what traces are displayed based on the RUST_LOG environment + // variable. + // + // Traces emitted by the example code will always be displayed. You + // can set `RUST_LOG=tokio=trace` to enable additional traces emitted by + // Tokio itself. + .with_env_filter(EnvFilter::from_default_env().add_directive("chat=info".parse()?)) + // Log events when `tracing` spans are created, entered, exited, or + // closed. When Tokio's internal tracing support is enabled (as + // described above), this can be used to track the lifecycle of spawned + // tasks on the Tokio runtime. + .with_span_events(FmtSpan::FULL) + // Set this subscriber as the default, to collect all traces emitted by + // the program. + .init(); + // Create the shared state. This is how all the peers communicate. // // The server task will hold a handle to this. For every new client, the @@ -59,7 +79,7 @@ async fn main() -> Result<(), Box> { // Note that this is the Tokio TcpListener, which is fully async. let mut listener = TcpListener::bind(&addr).await?; - println!("server running on {}", addr); + tracing::info!("server running on {}", addr); loop { // Asynchronously wait for an inbound TcpStream. @@ -70,8 +90,9 @@ async fn main() -> Result<(), Box> { // Spawn our handler to be run asynchronously. tokio::spawn(async move { + tracing::debug!("accepted connection"); if let Err(e) = process(state, stream, addr).await { - println!("an error occurred; error = {:?}", e); + tracing::info!("an error occurred; error = {:?}", e); } }); } @@ -200,7 +221,7 @@ async fn process( Some(Ok(line)) => line, // We didn't get a line so we return early here. _ => { - println!("Failed to get username from {}. Client disconnected.", addr); + tracing::error!("Failed to get username from {}. Client disconnected.", addr); return Ok(()); } }; @@ -212,7 +233,7 @@ async fn process( { let mut state = state.lock().await; let msg = format!("{} has joined the chat", username); - println!("{}", msg); + tracing::info!("{}", msg); state.broadcast(addr, &msg).await; } @@ -233,9 +254,10 @@ async fn process( peer.lines.send(&msg).await?; } Err(e) => { - println!( + tracing::error!( "an error occurred while processing messages for {}; error = {:?}", - username, e + username, + e ); } } @@ -248,7 +270,7 @@ async fn process( state.peers.remove(&addr); let msg = format!("{} has left the chat", username); - println!("{}", msg); + tracing::info!("{}", msg); state.broadcast(addr, &msg).await; } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index e596375d630..969fbcf01c5 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -106,6 +106,7 @@ iovec = { version = "0.1.4", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.10.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` +tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] mio-uds = { version = "0.6.5", optional = true } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 288f58d2f40..4b77544eb5c 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -364,6 +364,25 @@ macro_rules! cfg_unstable { } } +macro_rules! cfg_trace { + ($($item:item)*) => { + $( + #[cfg(feature = "tracing")] + #[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] + $item + )* + } +} + +macro_rules! cfg_not_trace { + ($($item:item)*) => { + $( + #[cfg(not(feature = "tracing"))] + $item + )* + } +} + macro_rules! cfg_coop { ($($item:item)*) => { $( diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index 4dab333eb9b..ed60f4c4734 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -114,6 +114,19 @@ cfg_blocking! { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { + #[cfg(feature = "tracing")] + let f = { + let span = tracing::trace_span!( + target: "tokio::task", + "task", + kind = %"blocking", + function = %std::any::type_name::(), + ); + move || { + let _g = span.enter(); + f() + } + }; crate::runtime::spawn_blocking(f) } } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 2a3a7e1e3ff..3c409edfb90 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -195,6 +195,7 @@ cfg_rt_util! { F: Future + 'static, F::Output: 'static, { + let future = crate::util::trace::task(future, "local"); CURRENT.with(|maybe_cx| { let cx = maybe_cx .expect("`spawn_local` called from outside of a `task::LocalSet`"); @@ -277,6 +278,7 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { + let future = crate::util::trace::task(future, "local"); let (task, handle) = unsafe { task::joinable_local(future) }; self.context.tasks.borrow_mut().queue.push_back(task); handle diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index fa5ff13b01e..d6e771184f2 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -129,6 +129,7 @@ doc_rt_core! { { let spawn_handle = runtime::context::spawn_handle() .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); + let task = crate::util::trace::task(task, "task"); spawn_handle.spawn(task) } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 8194dbaf65a..6dda08ca411 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -19,6 +19,8 @@ cfg_rt_threaded! { pub(crate) use try_lock::TryLock; } +pub(crate) mod trace; + #[cfg(any(feature = "macros", feature = "stream"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use rand::thread_rng_n; diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs new file mode 100644 index 00000000000..d8c6120d97c --- /dev/null +++ b/tokio/src/util/trace.rs @@ -0,0 +1,57 @@ +cfg_trace! { + cfg_rt_core! { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use pin_project_lite::pin_project; + + use tracing::Span; + + pin_project! { + /// A future that has been instrumented with a `tracing` span. + #[derive(Debug, Clone)] + pub(crate) struct Instrumented { + #[pin] + inner: T, + span: Span, + } + } + + impl Future for Instrumented { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let _enter = this.span.enter(); + this.inner.poll(cx) + } + } + + impl Instrumented { + pub(crate) fn new(inner: T, span: Span) -> Self { + Self { inner, span } + } + } + + #[inline] + pub(crate) fn task(task: F, kind: &'static str) -> Instrumented { + let span = tracing::trace_span!( + target: "tokio::task", + "task", + %kind, + future = %std::any::type_name::(), + ); + Instrumented::new(task, span) + } + } +} + +cfg_not_trace! { + cfg_rt_core! { + #[inline] + pub(crate) fn task(task: F, _: &'static str) -> F { + // nop + task + } + } +}