diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 3a48982..e392f5b 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -224,7 +224,8 @@ impl Stream for MeteredReceiver { impl MeteredReceiver { fn maybe_meter_tof(&mut self, maybe_value: Option>) -> Option { self.meter.note_received(); - maybe_value.map(|value| { + + let result = maybe_value.map(|value| { match value { MaybeTimeOfFlight::::WithTimeOfFlight(value, tof_start) => { // do not use `.elapsed()` of `std::time`, it may panic @@ -236,7 +237,11 @@ impl MeteredReceiver { MaybeTimeOfFlight::::Bare(value) => value, } .into() - }) + }); + + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.inner.len()); + result } /// Get an updated accessor object for all metrics collected. @@ -330,6 +335,9 @@ impl MeteredSender { } else { MaybeTimeOfFlight::Bare(item) }; + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.inner.len()); + item } @@ -352,7 +360,10 @@ impl MeteredSender { self.meter.note_blocked(); self.meter.note_sent(); // we are going to do full blocking send, so we have to note it here let msg = send_err.into_inner().into(); - self.send_to_channel(msg).await + let result = self.send_to_channel(msg).await; + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.inner.len()); + result }, _ => Ok(()), } diff --git a/metered-channel/src/lib.rs b/metered-channel/src/lib.rs index b89241c..d0512dc 100644 --- a/metered-channel/src/lib.rs +++ b/metered-channel/src/lib.rs @@ -107,17 +107,25 @@ impl Meter { } fn note_sent(&self) -> usize { + #[cfg(feature = "futures_channel")] self.channel_len.fetch_add(1, Ordering::Relaxed); self.sent.fetch_add(1, Ordering::Relaxed) } fn retract_sent(&self) { self.sent.fetch_sub(1, Ordering::Relaxed); + #[cfg(feature = "futures_channel")] self.channel_len.fetch_add(1, Ordering::Relaxed); } + #[cfg(feature = "async_channel")] + fn note_channel_len(&self, len: usize) { + self.channel_len.store(len, Ordering::Relaxed); + } + fn note_received(&self) { self.received.fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "futures_channel")] self.channel_len.fetch_sub(1, Ordering::Relaxed); } diff --git a/metered-channel/src/unbounded.rs b/metered-channel/src/unbounded.rs index f1d4063..6658359 100644 --- a/metered-channel/src/unbounded.rs +++ b/metered-channel/src/unbounded.rs @@ -84,7 +84,7 @@ impl Stream for UnboundedMeteredReceiver { impl UnboundedMeteredReceiver { fn maybe_meter_tof(&mut self, maybe_value: Option>) -> Option { self.meter.note_received(); - maybe_value.map(|value| { + let result = maybe_value.map(|value| { match value { MaybeTimeOfFlight::::WithTimeOfFlight(value, tof_start) => { // do not use `.elapsed()` of `std::time`, it may panic @@ -96,7 +96,12 @@ impl UnboundedMeteredReceiver { MaybeTimeOfFlight::::Bare(value) => value, } .into() - }) + }); + + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.inner.len()); + + result } /// Get an updated accessor object for all metrics collected. @@ -170,6 +175,10 @@ impl UnboundedMeteredSender { } else { MaybeTimeOfFlight::Bare(item) }; + + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.inner.len()); + item }