diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 009e4f5b61..65129e8fc6 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -51,7 +51,7 @@ use crate::Msg; use crossbeam_channel::{bounded, SendTimeoutError, Sender}; use std::io; use std::io::Write; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::JoinHandle; @@ -124,11 +124,20 @@ pub struct WorkerGuard { /// [fmt]: mod@tracing_subscriber::fmt #[derive(Clone, Debug)] pub struct NonBlocking { - error_counter: Arc, + error_counter: ErrorCounter, channel: Sender, is_lossy: bool, } +/// Tracks the number of times a log line was dropped by the background thread. +/// +/// If the non-blocking writer is not configured in [lossy mode], the error +/// count should always be 0. +/// +/// [lossy mode]: NonBlockingBuilder::lossy +#[derive(Clone, Debug)] +pub struct ErrorCounter(Arc); + impl NonBlocking { /// Returns a new `NonBlocking` writer wrapping the provided `writer`. /// @@ -157,7 +166,7 @@ impl NonBlocking { ( Self { channel: sender, - error_counter: Arc::new(AtomicU64::new(0)), + error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))), is_lossy, }, worker_guard, @@ -166,7 +175,7 @@ impl NonBlocking { /// Returns a counter for the number of times logs where dropped. This will always return zero if /// `NonBlocking` is not lossy. - pub fn error_counter(&self) -> Arc { + pub fn error_counter(&self) -> ErrorCounter { self.error_counter.clone() } } @@ -218,7 +227,7 @@ impl std::io::Write for NonBlocking { let buf_size = buf.len(); if self.is_lossy { if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() { - self.error_counter.fetch_add(1, Ordering::Release); + self.error_counter.incr_saturating(); } } else { return match self.channel.send(Msg::Line(buf.to_vec())) { @@ -279,6 +288,43 @@ impl Drop for WorkerGuard { } } +// === impl ErrorCounter === + +impl ErrorCounter { + /// Returns the number of log lines that have been dropped. + /// + /// If the non-blocking writer is not configured in [lossy mode], the error + /// count should always be 0. + /// + /// [lossy mode]: NonBlockingBuilder::lossy + pub fn dropped_lines(&self) -> usize { + self.0.load(Ordering::Acquire) + } + + fn incr_saturating(&self) { + let mut curr = self.0.load(Ordering::Acquire); + // We don't need to enter the CAS loop if the current value is already + // `usize::MAX`. + if curr == usize::MAX { + return; + } + + // This is implemented as a CAS loop rather than as a simple + // `fetch_add`, because we don't want to wrap on overflow. Instead, we + // need to ensure that saturating addition is performed. + loop { + let val = curr.saturating_add(1); + match self + .0 + .compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => return, + Err(actual) => curr = actual, + } + } + } +} + #[cfg(test)] mod test { use super::*; @@ -321,7 +367,7 @@ mod test { let error_count = non_blocking.error_counter(); non_blocking.write_all(b"Hello").expect("Failed to write"); - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); let handle = thread::spawn(move || { non_blocking.write_all(b", World").expect("Failed to write"); @@ -330,7 +376,7 @@ mod test { // Sleep a little to ensure previously spawned thread gets blocked on write. thread::sleep(Duration::from_millis(100)); // We should not drop logs when blocked. - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); // Read the first message to unblock sender. let mut line = rx.recv().unwrap(); @@ -365,17 +411,17 @@ mod test { // First write will not block write_non_blocking(&mut non_blocking, b"Hello"); - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); // Second write will not block as Worker will have called `recv` on channel. // "Hello" is not yet consumed. MockWriter call to write_all will block until // "Hello" is consumed. write_non_blocking(&mut non_blocking, b", World"); - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); // Will sit in NonBlocking channel's buffer. write_non_blocking(&mut non_blocking, b"Test"); - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); // Allow a line to be written. "Hello" message will be consumed. // ", World" will be able to write to MockWriter. @@ -385,12 +431,12 @@ mod test { // This will block as NonBlocking channel is full. write_non_blocking(&mut non_blocking, b"Universe"); - assert_eq!(1, error_count.load(Ordering::Acquire)); + assert_eq!(1, error_count.dropped_lines()); // Finally the second message sent will be consumed. let line = rx.recv().unwrap(); assert_eq!(line, ", World"); - assert_eq!(1, error_count.load(Ordering::Acquire)); + assert_eq!(1, error_count.dropped_lines()); } #[test] @@ -426,6 +472,6 @@ mod test { } assert_eq!(10, hello_count); - assert_eq!(0, error_count.load(Ordering::Acquire)); + assert_eq!(0, error_count.dropped_lines()); } }