Skip to content

Commit

Permalink
use channel len() from async channel
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim committed Sep 15, 2023
1 parent 7c002ca commit a3f0fd9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
17 changes: 14 additions & 3 deletions metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ impl<T> Stream for MeteredReceiver<T> {
impl<T> MeteredReceiver<T> {
fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
self.meter.note_received();
maybe_value.map(|value| {

let result = maybe_value.map(|value| {
match value {
MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
// do not use `.elapsed()` of `std::time`, it may panic
Expand All @@ -236,7 +237,11 @@ impl<T> MeteredReceiver<T> {
MaybeTimeOfFlight::<T>::Bare(value) => value,
}
.into()
})
});

#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.inner.len());
result
}

/// Get an updated accessor object for all metrics collected.
Expand Down Expand Up @@ -330,6 +335,9 @@ impl<T> MeteredSender<T> {
} else {
MaybeTimeOfFlight::Bare(item)
};
#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.inner.len());

item
}

Expand All @@ -352,7 +360,10 @@ impl<T> MeteredSender<T> {
self.meter.note_blocked();
self.meter.note_sent(); // we are going to do full blocking send, so we have to note it here
let msg = send_err.into_inner().into();
self.send_to_channel(msg).await
let result = self.send_to_channel(msg).await;
#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.inner.len());
result
},
_ => Ok(()),
}
Expand Down
8 changes: 8 additions & 0 deletions metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,25 @@ impl Meter {
}

fn note_sent(&self) -> usize {
#[cfg(feature = "futures_channel")]
self.channel_len.fetch_add(1, Ordering::Relaxed);
self.sent.fetch_add(1, Ordering::Relaxed)
}

fn retract_sent(&self) {
self.sent.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "futures_channel")]
self.channel_len.fetch_add(1, Ordering::Relaxed);
}

#[cfg(feature = "async_channel")]
fn note_channel_len(&self, len: usize) {
self.channel_len.store(len, Ordering::Relaxed);
}

fn note_received(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "futures_channel")]
self.channel_len.fetch_sub(1, Ordering::Relaxed);
}

Expand Down
13 changes: 11 additions & 2 deletions metered-channel/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T> Stream for UnboundedMeteredReceiver<T> {
impl<T> UnboundedMeteredReceiver<T> {
fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
self.meter.note_received();
maybe_value.map(|value| {
let result = maybe_value.map(|value| {
match value {
MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
// do not use `.elapsed()` of `std::time`, it may panic
Expand All @@ -96,7 +96,12 @@ impl<T> UnboundedMeteredReceiver<T> {
MaybeTimeOfFlight::<T>::Bare(value) => value,
}
.into()
})
});

#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.inner.len());

result
}

/// Get an updated accessor object for all metrics collected.
Expand Down Expand Up @@ -170,6 +175,10 @@ impl<T> UnboundedMeteredSender<T> {
} else {
MaybeTimeOfFlight::Bare(item)
};

#[cfg(feature = "async_channel")]
self.meter.note_channel_len(self.inner.len());

item
}

Expand Down

0 comments on commit a3f0fd9

Please sign in to comment.