diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 8280e93..7efcc57 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -242,6 +242,11 @@ impl MeteredReceiver { /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { + // For async_channel we can update channel length in the meter access + // to avoid more expensive updates on each RW operation + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.len()); + &self.meter } @@ -262,7 +267,6 @@ impl MeteredReceiver { Err(err) => Err(err), }; - self.meter.note_channel_len(self.len()); result } @@ -275,7 +279,6 @@ impl MeteredReceiver { Err(err) => Err(err.into()), }; - self.meter.note_channel_len(self.len()); result } @@ -288,7 +291,6 @@ impl MeteredReceiver { Err(err) => Err(err), }; - self.meter.note_channel_len(self.len()); result } @@ -297,6 +299,12 @@ impl MeteredReceiver { pub fn len(&self) -> usize { self.inner.len() } + + #[cfg(feature = "futures_channel")] + /// Returns the current number of messages in the channel based on meter approximation + pub fn len(&self) -> usize { + self.meter.calculate_channel_len() + } } impl futures::stream::FusedStream for MeteredReceiver { @@ -346,6 +354,10 @@ impl MeteredSender { /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { + // For async_channel we can update channel length in the meter access + // to avoid more expensive updates on each RW operation + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.len()); &self.meter } @@ -382,7 +394,6 @@ impl MeteredSender { SendError::Closed(err.0.into()) }); - self.meter.note_channel_len(self.len()); result } @@ -420,7 +431,6 @@ impl MeteredSender { TrySendError::from(e) }); - self.meter.note_channel_len(self.len()); result } @@ -429,4 +439,10 @@ impl MeteredSender { pub fn len(&self) -> usize { self.inner.len() } + + #[cfg(feature = "futures_channel")] + /// Returns the current number of messages in the channel based on meter approximation + pub fn len(&self) -> usize { + self.meter.calculate_channel_len() + } } diff --git a/metered-channel/src/lib.rs b/metered-channel/src/lib.rs index adecf06..1fa32be 100644 --- a/metered-channel/src/lib.rs +++ b/metered-channel/src/lib.rs @@ -42,6 +42,9 @@ use coarsetime::Instant as CoarseInstant; #[cfg(test)] mod tests; +/// Defines the maximum number of time of flight values to be stored. +const TOF_QUEUE_SIZE: usize = 100; + /// A peek into the inner state of a meter. #[derive(Debug, Clone)] pub struct Meter { @@ -54,7 +57,7 @@ pub struct Meter { channel_len: Arc, // Number of times senders blocked while sending messages to a subsystem. blocked: Arc, - // Atomic ringbuffer of the last 50 time of flight values + // Atomic ringbuffer of the last `TOF_QUEUE_SIZE` time of flight values tof: Arc>, } @@ -66,7 +69,7 @@ impl std::default::Default for Meter { #[cfg(feature = "async_channel")] channel_len: Arc::new(AtomicUsize::new(0)), blocked: Arc::new(AtomicUsize::new(0)), - tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)), + tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)), } } } @@ -140,6 +143,13 @@ impl Meter { fn note_time_of_flight(&self, tof: CoarseDuration) { let _ = self.tof.force_push(tof); } + + #[cfg(feature = "futures_channel")] + fn calculate_channel_len(&self) -> usize { + let sent = self.sent.load(Ordering::Relaxed); + let received = self.received.load(Ordering::Relaxed); + sent.saturating_sub(received) as usize + } } /// Determine if this instance shall be measured