Skip to content

Commit

Permalink
appender: fix compilation 32-bit platforms like PowerPC (tokio-rs#1675)
Browse files Browse the repository at this point in the history
This branch is @dzvon's PR tokio-rs#1508, with the following changes:

* Add a newtype wrapping the error counter to hide that it's internally
  an `Arc<AtomicUsize>`. This would allow us to make additional changes
  to the implementation without potentially causing breaking changes.

* Use saturating arithmetic when incrementing the counter to avoid
  wrapping to 0 on overflows. This is more likely to be an issue on
  32-bit platforms.

This is a breaking change that will be released as part of 
`tracing-appender` 0.2.

Closes tokio-rs#1508

Description from @dzvon's original PR:

## Motivation

Currently, tracing-appender crate cannot be compiled on PowerPC
platform. Because

> PowerPC and MIPS platforms with 32-bit pointers do not have
> `AtomicU64` or `AtomicI64` types.

quote from std library docs.
(https://doc.rust-lang.org/std/sync/atomic/index.html#portability)

## Solution

Change `AtomicU64` to `AtomicUsize`.

Co-authored-by: Dezhi Wu <[email protected]>
  • Loading branch information
hawkw and dzvon authored Oct 22, 2021
1 parent 2b3aa06 commit c91504d
Showing 1 changed file with 59 additions and 13 deletions.
72 changes: 59 additions & 13 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::Msg;
use crossbeam_channel::{bounded, SendTimeoutError, Sender};
use std::io;
use std::io::Write;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -124,11 +124,20 @@ pub struct WorkerGuard {
/// [fmt]: mod@tracing_subscriber::fmt
#[derive(Clone, Debug)]
pub struct NonBlocking {
error_counter: Arc<AtomicU64>,
error_counter: ErrorCounter,
channel: Sender<Msg>,
is_lossy: bool,
}

/// Tracks the number of times a log line was dropped by the background thread.
///
/// If the non-blocking writer is not configured in [lossy mode], the error
/// count should always be 0.
///
/// [lossy mode]: NonBlockingBuilder::lossy
#[derive(Clone, Debug)]
pub struct ErrorCounter(Arc<AtomicUsize>);

impl NonBlocking {
/// Returns a new `NonBlocking` writer wrapping the provided `writer`.
///
Expand Down Expand Up @@ -157,7 +166,7 @@ impl NonBlocking {
(
Self {
channel: sender,
error_counter: Arc::new(AtomicU64::new(0)),
error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
is_lossy,
},
worker_guard,
Expand All @@ -166,7 +175,7 @@ impl NonBlocking {

/// Returns a counter for the number of times logs where dropped. This will always return zero if
/// `NonBlocking` is not lossy.
pub fn error_counter(&self) -> Arc<AtomicU64> {
pub fn error_counter(&self) -> ErrorCounter {
self.error_counter.clone()
}
}
Expand Down Expand Up @@ -218,7 +227,7 @@ impl std::io::Write for NonBlocking {
let buf_size = buf.len();
if self.is_lossy {
if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() {
self.error_counter.fetch_add(1, Ordering::Release);
self.error_counter.incr_saturating();
}
} else {
return match self.channel.send(Msg::Line(buf.to_vec())) {
Expand Down Expand Up @@ -279,6 +288,43 @@ impl Drop for WorkerGuard {
}
}

// === impl ErrorCounter ===

impl ErrorCounter {
/// Returns the number of log lines that have been dropped.
///
/// If the non-blocking writer is not configured in [lossy mode], the error
/// count should always be 0.
///
/// [lossy mode]: NonBlockingBuilder::lossy
pub fn dropped_lines(&self) -> usize {
self.0.load(Ordering::Acquire)
}

fn incr_saturating(&self) {
let mut curr = self.0.load(Ordering::Acquire);
// We don't need to enter the CAS loop if the current value is already
// `usize::MAX`.
if curr == usize::MAX {
return;
}

// This is implemented as a CAS loop rather than as a simple
// `fetch_add`, because we don't want to wrap on overflow. Instead, we
// need to ensure that saturating addition is performed.
loop {
let val = curr.saturating_add(1);
match self
.0
.compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => return,
Err(actual) => curr = actual,
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -321,7 +367,7 @@ mod test {
let error_count = non_blocking.error_counter();

non_blocking.write_all(b"Hello").expect("Failed to write");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

let handle = thread::spawn(move || {
non_blocking.write_all(b", World").expect("Failed to write");
Expand All @@ -330,7 +376,7 @@ mod test {
// Sleep a little to ensure previously spawned thread gets blocked on write.
thread::sleep(Duration::from_millis(100));
// We should not drop logs when blocked.
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Read the first message to unblock sender.
let mut line = rx.recv().unwrap();
Expand Down Expand Up @@ -365,17 +411,17 @@ mod test {

// First write will not block
write_non_blocking(&mut non_blocking, b"Hello");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Second write will not block as Worker will have called `recv` on channel.
// "Hello" is not yet consumed. MockWriter call to write_all will block until
// "Hello" is consumed.
write_non_blocking(&mut non_blocking, b", World");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Will sit in NonBlocking channel's buffer.
write_non_blocking(&mut non_blocking, b"Test");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Allow a line to be written. "Hello" message will be consumed.
// ", World" will be able to write to MockWriter.
Expand All @@ -385,12 +431,12 @@ mod test {

// This will block as NonBlocking channel is full.
write_non_blocking(&mut non_blocking, b"Universe");
assert_eq!(1, error_count.load(Ordering::Acquire));
assert_eq!(1, error_count.dropped_lines());

// Finally the second message sent will be consumed.
let line = rx.recv().unwrap();
assert_eq!(line, ", World");
assert_eq!(1, error_count.load(Ordering::Acquire));
assert_eq!(1, error_count.dropped_lines());
}

#[test]
Expand Down Expand Up @@ -426,6 +472,6 @@ mod test {
}

assert_eq!(10, hello_count);
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());
}
}

0 comments on commit c91504d

Please sign in to comment.