From e6553c4ee3c5914cba885a6498451fcd0de3e506 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Tue, 19 Sep 2023 07:13:19 -0500 Subject: [PATCH 1/4] sync: add Semaphore example using an `Arc` (#5956) --- tokio/src/sync/semaphore.rs | 73 ++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index ef18afa32f0..3f7db8a3cca 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -47,32 +47,6 @@ use std::sync::Arc; /// } /// ``` /// -/// Use [`Semaphore::acquire_owned`] to move permits across tasks: -/// -/// ``` -/// use std::sync::Arc; -/// use tokio::sync::Semaphore; -/// -/// #[tokio::main] -/// async fn main() { -/// let semaphore = Arc::new(Semaphore::new(3)); -/// let mut join_handles = Vec::new(); -/// -/// for _ in 0..5 { -/// let permit = semaphore.clone().acquire_owned().await.unwrap(); -/// join_handles.push(tokio::spawn(async move { -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); -/// })); -/// } -/// -/// for handle in join_handles { -/// handle.await.unwrap(); -/// } -/// } -/// ``` -/// /// Limit the number of simultaneously opened files in your program. /// /// Most operating systems have limits on the number of open file @@ -102,6 +76,53 @@ use std::sync::Arc; /// } /// ``` /// +/// Limit the number of incoming requests being handled at the same time. +/// +/// Similar to limiting the number of simultaneously opened files, network handles +/// are a limited resource. Allowing an unbounded amount of requests to be processed +/// could result in a denial-of-service, among many other issues. +/// +/// This example uses an `Arc` instead of a global variable. +/// To limit the number of requests that can be processed at the time, +/// we acquire a permit for each task before spawning it. Once acquired, +/// a new task is spawned; and once finished, the permit is dropped inside +/// of the task to allow others to spawn. Permits must be acquired via +/// [`Semaphore::acquire_owned`] to be movable across the task boundary. +/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.) +/// +/// ```no_run +/// use std::sync::Arc; +/// use tokio::sync::Semaphore; +/// use tokio::net::TcpListener; +/// +/// #[tokio::main] +/// async fn main() -> std::io::Result<()> { +/// let semaphore = Arc::new(Semaphore::new(3)); +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// // Acquire permit before accepting the next socket. +/// // +/// // We use `acquire_owned` so that we can move `permit` into +/// // other tasks. +/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// let (mut socket, _) = listener.accept().await?; +/// +/// tokio::spawn(async move { +/// // Do work using the socket. +/// handle_connection(&mut socket).await; +/// // Drop socket while the permit is still live. +/// drop(socket); +/// // Drop the permit, so more tasks can be created. +/// drop(permit); +/// }); +/// } +/// } +/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) { +/// # // Do work +/// # } +/// ``` +/// /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned #[derive(Debug)] From 804511822b4be12a958d01fe1156b0a71bc8e6f7 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 19 Sep 2023 15:13:46 +0200 Subject: [PATCH 2/4] sync: rename `watch::mark_unseen` to `watch::mark_changed` (#6014) --- tokio/src/sync/watch.rs | 13 ++++++++++--- tokio/tests/sync_watch.rs | 12 ++++++------ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 0452a81aa0a..2b4fd561e96 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -380,7 +380,7 @@ mod state { impl Version { /// Get the initial version when creating the channel. pub(super) fn initial() -> Self { - // The initial version is 1 so that `mark_unseen` can decrement by one. + // The initial version is 1 so that `mark_changed` can decrement by one. // (The value is 2 due to the closed bit.) Version(2) } @@ -644,8 +644,15 @@ impl Receiver { Ok(self.version != new_version) } - /// Marks the state as unseen. - pub fn mark_unseen(&mut self) { + /// Marks the state as changed. + /// + /// After invoking this method [`has_changed()`](Self::has_changed) + /// returns `true` and [`changed()`](Self::changed) returns + /// immediately, regardless of whether a new value has been sent. + /// + /// This is useful for triggering an initial change notification after + /// subscribing to synchronize new receivers. + pub fn mark_changed(&mut self) { self.version.decrement(); } diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5bcb7476888..70cc110b937 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -49,19 +49,19 @@ fn rx_version_underflow() { let (_tx, mut rx) = watch::channel("one"); // Version starts at 2, validate we do not underflow - rx.mark_unseen(); - rx.mark_unseen(); + rx.mark_changed(); + rx.mark_changed(); } #[test] -fn rx_mark_unseen() { +fn rx_mark_changed() { let (tx, mut rx) = watch::channel("one"); let mut rx2 = rx.clone(); let mut rx3 = rx.clone(); let mut rx4 = rx.clone(); { - rx.mark_unseen(); + rx.mark_changed(); assert!(rx.has_changed().unwrap()); let mut t = spawn(rx.changed()); @@ -76,7 +76,7 @@ fn rx_mark_unseen() { } { - rx3.mark_unseen(); + rx3.mark_changed(); assert_eq!(*rx3.borrow(), "one"); assert!(rx3.has_changed().unwrap()); @@ -94,7 +94,7 @@ fn rx_mark_unseen() { assert!(rx4.has_changed().unwrap()); assert_eq!(*rx4.borrow_and_update(), "two"); - rx4.mark_unseen(); + rx4.mark_changed(); assert!(rx4.has_changed().unwrap()); assert_eq!(*rx4.borrow_and_update(), "two") } From 9d51b76d017cfef12e053760fa31f0845c214e3a Mon Sep 17 00:00:00 2001 From: nicflower <50106721+nicflower@users.noreply.github.com> Date: Tue, 19 Sep 2023 16:01:36 +0200 Subject: [PATCH 3/4] sync: add `watch::Sender::new` (#5998) --- tokio/src/sync/watch.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 2b4fd561e96..9e7cd1a7966 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -876,6 +876,27 @@ impl Drop for Receiver { } impl Sender { + /// Creates the sending-half of the [`watch`] channel. + /// + /// See documentation of [`watch::channel`] for errors when calling this function. + /// Beware that attempting to send a value when there are no receivers will + /// return an error. + /// + /// [`watch`]: crate::sync::watch + /// [`watch::channel`]: crate::sync::watch + /// + /// # Examples + /// ``` + /// let sender = tokio::sync::watch::Sender::new(0u8); + /// assert!(sender.send(3).is_err()); + /// let _rec = sender.subscribe(); + /// assert!(sender.send(4).is_ok()); + /// ``` + pub fn new(init: T) -> Self { + let (tx, _) = channel(init); + tx + } + /// Sends a new value via the channel, notifying all receivers. /// /// This method fails if the channel is closed, which is the case when From ad7f988da377c365cacb5ca24d044a9be5de5889 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 19 Sep 2023 17:44:08 +0200 Subject: [PATCH 4/4] sync: fix `mark_changed` when version overflows (#6017) --- tokio/src/sync/watch.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 9e7cd1a7966..61307fce47d 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -359,7 +359,10 @@ mod state { use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::SeqCst; - const CLOSED: usize = 1; + const CLOSED_BIT: usize = 1; + + // Using 2 as the step size preserves the `CLOSED_BIT`. + const STEP_SIZE: usize = 2; /// The version part of the state. The lowest bit is always zero. #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -378,31 +381,26 @@ mod state { pub(super) struct AtomicState(AtomicUsize); impl Version { - /// Get the initial version when creating the channel. - pub(super) fn initial() -> Self { - // The initial version is 1 so that `mark_changed` can decrement by one. - // (The value is 2 due to the closed bit.) - Version(2) - } - /// Decrements the version. pub(super) fn decrement(&mut self) { - // Decrement by two to avoid touching the CLOSED bit. - if self.0 >= 2 { - self.0 -= 2; - } + // Using a wrapping decrement here is required to ensure that the + // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()` + // which wraps on overflow. + self.0 = self.0.wrapping_sub(STEP_SIZE); } + + pub(super) const INITIAL: Self = Version(0); } impl StateSnapshot { /// Extract the version from the state. pub(super) fn version(self) -> Version { - Version(self.0 & !CLOSED) + Version(self.0 & !CLOSED_BIT) } /// Is the closed bit set? pub(super) fn is_closed(self) -> bool { - (self.0 & CLOSED) == CLOSED + (self.0 & CLOSED_BIT) == CLOSED_BIT } } @@ -410,7 +408,7 @@ mod state { /// Create a new `AtomicState` that is not closed and which has the /// version set to `Version::initial()`. pub(super) fn new() -> Self { - AtomicState(AtomicUsize::new(2)) + AtomicState(AtomicUsize::new(Version::INITIAL.0)) } /// Load the current value of the state. @@ -420,13 +418,12 @@ mod state { /// Increment the version counter. pub(super) fn increment_version(&self) { - // Increment by two to avoid touching the CLOSED bit. - self.0.fetch_add(2, SeqCst); + self.0.fetch_add(STEP_SIZE, SeqCst); } /// Set the closed bit in the state. pub(super) fn set_closed(&self) { - self.0.fetch_or(CLOSED, SeqCst); + self.0.fetch_or(CLOSED_BIT, SeqCst); } } } @@ -482,7 +479,7 @@ pub fn channel(init: T) -> (Sender, Receiver) { let rx = Receiver { shared, - version: Version::initial(), + version: Version::INITIAL, }; (tx, rx)