Skip to content

Commit

Permalink
Avoid writing length on each IO for async channel (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov authored Sep 25, 2023
1 parent 6d5d8eb commit a5a9330
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
26 changes: 21 additions & 5 deletions metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl<T> MeteredReceiver<T> {

/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
// For async_channel we can update channel length in the meter access
// to avoid more expensive updates on each RW operation
#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.len());

&self.meter
}

Expand All @@ -262,7 +267,6 @@ impl<T> MeteredReceiver<T> {
Err(err) => Err(err),
};

self.meter.note_channel_len(self.len());
result
}

Expand All @@ -275,7 +279,6 @@ impl<T> MeteredReceiver<T> {
Err(err) => Err(err.into()),
};

self.meter.note_channel_len(self.len());
result
}

Expand All @@ -288,7 +291,6 @@ impl<T> MeteredReceiver<T> {
Err(err) => Err(err),
};

self.meter.note_channel_len(self.len());
result
}

Expand All @@ -297,6 +299,12 @@ impl<T> MeteredReceiver<T> {
pub fn len(&self) -> usize {
self.inner.len()
}

#[cfg(feature = "futures_channel")]
/// Returns the current number of messages in the channel based on meter approximation
pub fn len(&self) -> usize {
self.meter.calculate_channel_len()
}
}

impl<T> futures::stream::FusedStream for MeteredReceiver<T> {
Expand Down Expand Up @@ -346,6 +354,10 @@ impl<T> MeteredSender<T> {

/// Get an updated accessor object for all metrics collected.
pub fn meter(&self) -> &Meter {
// For async_channel we can update channel length in the meter access
// to avoid more expensive updates on each RW operation
#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.len());
&self.meter
}

Expand Down Expand Up @@ -382,7 +394,6 @@ impl<T> MeteredSender<T> {
SendError::Closed(err.0.into())
});

self.meter.note_channel_len(self.len());
result
}

Expand Down Expand Up @@ -420,7 +431,6 @@ impl<T> MeteredSender<T> {
TrySendError::from(e)
});

self.meter.note_channel_len(self.len());
result
}

Expand All @@ -429,4 +439,10 @@ impl<T> MeteredSender<T> {
pub fn len(&self) -> usize {
self.inner.len()
}

#[cfg(feature = "futures_channel")]
/// Returns the current number of messages in the channel based on meter approximation
pub fn len(&self) -> usize {
self.meter.calculate_channel_len()
}
}
14 changes: 12 additions & 2 deletions metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ use coarsetime::Instant as CoarseInstant;
#[cfg(test)]
mod tests;

/// Defines the maximum number of time of flight values to be stored.
const TOF_QUEUE_SIZE: usize = 100;

/// A peek into the inner state of a meter.
#[derive(Debug, Clone)]
pub struct Meter {
Expand All @@ -54,7 +57,7 @@ pub struct Meter {
channel_len: Arc<AtomicUsize>,
// Number of times senders blocked while sending messages to a subsystem.
blocked: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
// Atomic ringbuffer of the last `TOF_QUEUE_SIZE` time of flight values
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}

Expand All @@ -66,7 +69,7 @@ impl std::default::Default for Meter {
#[cfg(feature = "async_channel")]
channel_len: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)),
}
}
}
Expand Down Expand Up @@ -140,6 +143,13 @@ impl Meter {
fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}

#[cfg(feature = "futures_channel")]
fn calculate_channel_len(&self) -> usize {
let sent = self.sent.load(Ordering::Relaxed);
let received = self.received.load(Ordering::Relaxed);
sent.saturating_sub(received) as usize
}
}

/// Determine if this instance shall be measured
Expand Down

0 comments on commit a5a9330

Please sign in to comment.