diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 3a48982..8280e93 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -224,6 +224,7 @@ impl Stream for MeteredReceiver { impl MeteredReceiver { fn maybe_meter_tof(&mut self, maybe_value: Option>) -> Option { self.meter.note_received(); + maybe_value.map(|value| { match value { MaybeTimeOfFlight::::WithTimeOfFlight(value, tof_start) => { @@ -256,30 +257,39 @@ impl MeteredReceiver { /// Attempt to receive the next item. #[cfg(feature = "async_channel")] pub fn try_next(&mut self) -> Result, TryRecvError> { - match self.inner.try_recv() { + let result = match self.inner.try_recv() { Ok(value) => Ok(self.maybe_meter_tof(Some(value))), Err(err) => Err(err), - } + }; + + self.meter.note_channel_len(self.len()); + result } /// Receive the next item. #[cfg(feature = "async_channel")] pub async fn recv(&mut self) -> Result { - match self.inner.recv().await { + let result = match self.inner.recv().await { Ok(value) => Ok(self.maybe_meter_tof(Some(value)).expect("wrapped value is always Some, qed")), Err(err) => Err(err.into()), - } + }; + + self.meter.note_channel_len(self.len()); + result } /// Attempt to receive the next item without blocking #[cfg(feature = "async_channel")] pub fn try_recv(&mut self) -> Result { - match self.inner.try_recv() { + let result = match self.inner.try_recv() { Ok(value) => Ok(self.maybe_meter_tof(Some(value)).expect("wrapped value is always Some, qed")), Err(err) => Err(err), - } + }; + + self.meter.note_channel_len(self.len()); + result } #[cfg(feature = "async_channel")] @@ -330,6 +340,7 @@ impl MeteredSender { } else { MaybeTimeOfFlight::Bare(item) }; + item } @@ -366,10 +377,13 @@ impl MeteredSender { ) -> result::Result<(), SendError> { let fut = self.inner.send(msg); futures::pin_mut!(fut); - fut.await.map_err(|err| { + let result = fut.await.map_err(|err| { self.meter.retract_sent(); SendError::Closed(err.0.into()) - }) + }); + + self.meter.note_channel_len(self.len()); + result } #[cfg(feature = "futures_channel")] @@ -401,10 +415,13 @@ impl MeteredSender { /// Attempt to send message or fail immediately. pub fn try_send(&mut self, msg: T) -> result::Result<(), TrySendError> { let msg = self.prepare_with_tof(msg); // note_sent is called in here - self.inner.try_send(msg).map_err(|e| { + let result = self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send TrySendError::from(e) - }) + }); + + self.meter.note_channel_len(self.len()); + result } #[cfg(feature = "async_channel")] diff --git a/metered-channel/src/lib.rs b/metered-channel/src/lib.rs index 7de62a8..adecf06 100644 --- a/metered-channel/src/lib.rs +++ b/metered-channel/src/lib.rs @@ -49,6 +49,9 @@ pub struct Meter { sent: Arc, // Number of receives on this channel. received: Arc, + #[cfg(feature = "async_channel")] + // Number of elements in the channel. + channel_len: Arc, // Number of times senders blocked while sending messages to a subsystem. blocked: Arc, // Atomic ringbuffer of the last 50 time of flight values @@ -60,6 +63,8 @@ impl std::default::Default for Meter { Self { sent: Arc::new(AtomicUsize::new(0)), received: Arc::new(AtomicUsize::new(0)), + #[cfg(feature = "async_channel")] + channel_len: Arc::new(AtomicUsize::new(0)), blocked: Arc::new(AtomicUsize::new(0)), tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)), } @@ -75,6 +80,8 @@ pub struct Readout { pub sent: usize, /// The amount of messages received on the channel, in aggregate. pub received: usize, + /// An approximation of the queue size. + pub channel_len: usize, /// How many times the caller blocked when sending messages. pub blocked: usize, /// Time of flight in micro seconds (us) @@ -86,9 +93,18 @@ impl Meter { pub fn read(&self) -> Readout { // when obtaining we don't care much about off by one // accuracy + let sent = self.sent.load(Ordering::Relaxed); + let received = self.received.load(Ordering::Relaxed); + + #[cfg(feature = "async_channel")] + let channel_len = self.channel_len.load(Ordering::Relaxed); + #[cfg(feature = "futures_channel")] + let channel_len = sent.saturating_sub(received); + Readout { - sent: self.sent.load(Ordering::Relaxed), - received: self.received.load(Ordering::Relaxed), + sent, + received, + channel_len, blocked: self.blocked.load(Ordering::Relaxed), tof: { let mut acc = Vec::with_capacity(self.tof.len()); @@ -104,6 +120,11 @@ impl Meter { self.sent.fetch_add(1, Ordering::Relaxed) } + #[cfg(feature = "async_channel")] + fn note_channel_len(&self, len: usize) { + self.channel_len.store(len, Ordering::Relaxed) + } + fn retract_sent(&self) { self.sent.fetch_sub(1, Ordering::Relaxed); } diff --git a/metered-channel/src/tests.rs b/metered-channel/src/tests.rs index a5ed9a5..01eaced 100644 --- a/metered-channel/src/tests.rs +++ b/metered-channel/src/tests.rs @@ -39,12 +39,12 @@ fn try_send_try_next() { assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. }); rx.try_next().unwrap(); rx.try_next().unwrap(); - assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => { + assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, channel_len: 1, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 1); }); rx.try_next().unwrap(); - assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => { + assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, channel_len: 0, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 0); }); @@ -68,12 +68,12 @@ fn try_send_try_next_unbounded() { assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. }); rx.try_next().unwrap(); rx.try_next().unwrap(); - assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => { + assert_matches!(tx.meter().read(), Readout { sent: 4, channel_len: 1, received: 3, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 1); }); rx.try_next().unwrap(); - assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => { + assert_matches!(rx.meter().read(), Readout { sent: 4, channel_len: 0, received: 4, blocked: 0, tof } => { // every second in test, consumed before assert_eq!(dbg!(tof).len(), 0); }); diff --git a/metered-channel/src/unbounded.rs b/metered-channel/src/unbounded.rs index f1d4063..f2144c8 100644 --- a/metered-channel/src/unbounded.rs +++ b/metered-channel/src/unbounded.rs @@ -116,10 +116,13 @@ impl UnboundedMeteredReceiver { /// Attempt to receive the next item. #[cfg(feature = "async_channel")] pub fn try_next(&mut self) -> Result, TryRecvError> { - match self.inner.try_recv() { + let result = match self.inner.try_recv() { Ok(value) => Ok(self.maybe_meter_tof(Some(value))), Err(err) => Err(err), - } + }; + + self.meter.note_channel_len(self.len()); + result } /// Returns the current number of messages in the channel @@ -170,6 +173,7 @@ impl UnboundedMeteredSender { } else { MaybeTimeOfFlight::Bare(item) }; + item } @@ -192,10 +196,13 @@ impl UnboundedMeteredSender { #[cfg(feature = "async_channel")] pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError>> { let msg = self.prepare_with_tof(msg); - self.inner.try_send(msg).map_err(|e| { + let result = self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); e - }) + }); + + self.meter.note_channel_len(self.len()); + result } /// Returns the current number of messages in the channel