From 02b779e315c5c5f0dbbc8b56fc711cc8e665ee1e Mon Sep 17 00:00:00 2001
From: Paul Olteanu
Date: Sat, 30 Dec 2023 15:05:37 -0500
Subject: [PATCH] sync: add `watch::Receiver::mark_unchanged` (#6252)
---
tokio/src/sync/watch.rs | 11 +++++++++++
tokio/tests/sync_watch.rs | 33 +++++++++++++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 587aa795aef..3979b07202f 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -669,6 +669,17 @@ impl Receiver {
self.version.decrement();
}
+ /// Marks the state as unchanged.
+ ///
+ /// The current value will be considered seen by the receiver.
+ ///
+ /// This is useful if you are not interested in the current value
+ /// visible in the receiver.
+ pub fn mark_unchanged(&mut self) {
+ let current_version = self.shared.state.load().version();
+ self.version = current_version;
+ }
+
/// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 70cc110b937..a5b229f7ddc 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -102,6 +102,39 @@ fn rx_mark_changed() {
assert_eq!(*rx.borrow(), "two");
}
+#[test]
+fn rx_mark_unchanged() {
+ let (tx, mut rx) = watch::channel("one");
+
+ let mut rx2 = rx.clone();
+
+ {
+ assert!(!rx.has_changed().unwrap());
+
+ rx.mark_changed();
+ assert!(rx.has_changed().unwrap());
+
+ rx.mark_unchanged();
+ assert!(!rx.has_changed().unwrap());
+
+ let mut t = spawn(rx.changed());
+ assert_pending!(t.poll());
+ }
+
+ {
+ assert!(!rx2.has_changed().unwrap());
+
+ tx.send("two").unwrap();
+ assert!(rx2.has_changed().unwrap());
+
+ rx2.mark_unchanged();
+ assert!(!rx2.has_changed().unwrap());
+ assert_eq!(*rx2.borrow_and_update(), "two");
+ }
+
+ assert_eq!(*rx.borrow(), "two");
+}
+
#[test]
fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");