Skip to content

Commit

Permalink
add channel len metric
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim committed Sep 15, 2023
1 parent a7f1187 commit 7c002ca
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
9 changes: 9 additions & 0 deletions metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct Meter {
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
received: Arc<AtomicUsize>,
// Approximative number of elements in queue.
channel_len: Arc<AtomicUsize>,
// Number of times senders blocked while sending messages to a subsystem.
blocked: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
Expand All @@ -60,6 +62,7 @@ impl std::default::Default for Meter {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
channel_len: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
}
Expand All @@ -75,6 +78,8 @@ pub struct Readout {
pub sent: usize,
/// The amount of messages received on the channel, in aggregate.
pub received: usize,
/// An approximation of the queue size.
pub channel_len: usize,
/// How many times the caller blocked when sending messages.
pub blocked: usize,
/// Time of flight in micro seconds (us)
Expand All @@ -89,6 +94,7 @@ impl Meter {
Readout {
sent: self.sent.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
channel_len: self.channel_len.load(Ordering::Relaxed),
blocked: self.blocked.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
Expand All @@ -101,15 +107,18 @@ impl Meter {
}

fn note_sent(&self) -> usize {
self.channel_len.fetch_add(1, Ordering::Relaxed);
self.sent.fetch_add(1, Ordering::Relaxed)
}

fn retract_sent(&self) {
self.sent.fetch_sub(1, Ordering::Relaxed);
self.channel_len.fetch_add(1, Ordering::Relaxed);
}

fn note_received(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
self.channel_len.fetch_sub(1, Ordering::Relaxed);
}

fn note_blocked(&self) {
Expand Down
8 changes: 4 additions & 4 deletions metered-channel/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ fn try_send_try_next() {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => {
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, channel_len: 1, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, channel_len: 0, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
Expand All @@ -68,12 +68,12 @@ fn try_send_try_next_unbounded() {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => {
assert_matches!(tx.meter().read(), Readout { sent: 4, channel_len: 1, received: 3, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => {
assert_matches!(rx.meter().read(), Readout { sent: 4, channel_len: 0, received: 4, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
Expand Down

0 comments on commit 7c002ca

Please sign in to comment.