Skip to content

Commit

Permalink
Merge branch 'master' into reduce-lock-contention
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang authored Sep 20, 2023
2 parents 191edf6 + ad7f988 commit 06a675c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 53 deletions.
73 changes: 47 additions & 26 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,6 @@ use std::sync::Arc;
/// }
/// ```
///
/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(3));
/// let mut join_handles = Vec::new();
///
/// for _ in 0..5 {
/// let permit = semaphore.clone().acquire_owned().await.unwrap();
/// join_handles.push(tokio::spawn(async move {
/// // perform task...
/// // explicitly own `permit` in the task
/// drop(permit);
/// }));
/// }
///
/// for handle in join_handles {
/// handle.await.unwrap();
/// }
/// }
/// ```
///
/// Limit the number of simultaneously opened files in your program.
///
/// Most operating systems have limits on the number of open file
Expand Down Expand Up @@ -102,6 +76,53 @@ use std::sync::Arc;
/// }
/// ```
///
/// Limit the number of incoming requests being handled at the same time.
///
/// Similar to limiting the number of simultaneously opened files, network handles
/// are a limited resource. Allowing an unbounded amount of requests to be processed
/// could result in a denial-of-service, among many other issues.
///
/// This example uses an `Arc<Semaphore>` instead of a global variable.
/// To limit the number of requests that can be processed at the time,
/// we acquire a permit for each task before spawning it. Once acquired,
/// a new task is spawned; and once finished, the permit is dropped inside
/// of the task to allow others to spawn. Permits must be acquired via
/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
///
/// ```no_run
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
/// use tokio::net::TcpListener;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let semaphore = Arc::new(Semaphore::new(3));
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// // Acquire permit before accepting the next socket.
/// //
/// // We use `acquire_owned` so that we can move `permit` into
/// // other tasks.
/// let permit = semaphore.clone().acquire_owned().await.unwrap();
/// let (mut socket, _) = listener.accept().await?;
///
/// tokio::spawn(async move {
/// // Do work using the socket.
/// handle_connection(&mut socket).await;
/// // Drop socket while the permit is still live.
/// drop(socket);
/// // Drop the permit, so more tasks can be created.
/// drop(permit);
/// });
/// }
/// }
/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
/// # // Do work
/// # }
/// ```
///
/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]
Expand Down
67 changes: 46 additions & 21 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ mod state {
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::SeqCst;

const CLOSED: usize = 1;
const CLOSED_BIT: usize = 1;

// Using 2 as the step size preserves the `CLOSED_BIT`.
const STEP_SIZE: usize = 2;

/// The version part of the state. The lowest bit is always zero.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
Expand All @@ -378,39 +381,34 @@ mod state {
pub(super) struct AtomicState(AtomicUsize);

impl Version {
/// Get the initial version when creating the channel.
pub(super) fn initial() -> Self {
// The initial version is 1 so that `mark_unseen` can decrement by one.
// (The value is 2 due to the closed bit.)
Version(2)
}

/// Decrements the version.
pub(super) fn decrement(&mut self) {
// Decrement by two to avoid touching the CLOSED bit.
if self.0 >= 2 {
self.0 -= 2;
}
// Using a wrapping decrement here is required to ensure that the
// operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
// which wraps on overflow.
self.0 = self.0.wrapping_sub(STEP_SIZE);
}

pub(super) const INITIAL: Self = Version(0);
}

impl StateSnapshot {
/// Extract the version from the state.
pub(super) fn version(self) -> Version {
Version(self.0 & !CLOSED)
Version(self.0 & !CLOSED_BIT)
}

/// Is the closed bit set?
pub(super) fn is_closed(self) -> bool {
(self.0 & CLOSED) == CLOSED
(self.0 & CLOSED_BIT) == CLOSED_BIT
}
}

impl AtomicState {
/// Create a new `AtomicState` that is not closed and which has the
/// version set to `Version::initial()`.
pub(super) fn new() -> Self {
AtomicState(AtomicUsize::new(2))
AtomicState(AtomicUsize::new(Version::INITIAL.0))
}

/// Load the current value of the state.
Expand All @@ -420,13 +418,12 @@ mod state {

/// Increment the version counter.
pub(super) fn increment_version(&self) {
// Increment by two to avoid touching the CLOSED bit.
self.0.fetch_add(2, SeqCst);
self.0.fetch_add(STEP_SIZE, SeqCst);
}

/// Set the closed bit in the state.
pub(super) fn set_closed(&self) {
self.0.fetch_or(CLOSED, SeqCst);
self.0.fetch_or(CLOSED_BIT, SeqCst);
}
}
}
Expand Down Expand Up @@ -482,7 +479,7 @@ pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {

let rx = Receiver {
shared,
version: Version::initial(),
version: Version::INITIAL,
};

(tx, rx)
Expand Down Expand Up @@ -644,8 +641,15 @@ impl<T> Receiver<T> {
Ok(self.version != new_version)
}

/// Marks the state as unseen.
pub fn mark_unseen(&mut self) {
/// Marks the state as changed.
///
/// After invoking this method [`has_changed()`](Self::has_changed)
/// returns `true` and [`changed()`](Self::changed) returns
/// immediately, regardless of whether a new value has been sent.
///
/// This is useful for triggering an initial change notification after
/// subscribing to synchronize new receivers.
pub fn mark_changed(&mut self) {
self.version.decrement();
}

Expand Down Expand Up @@ -869,6 +873,27 @@ impl<T> Drop for Receiver<T> {
}

impl<T> Sender<T> {
/// Creates the sending-half of the [`watch`] channel.
///
/// See documentation of [`watch::channel`] for errors when calling this function.
/// Beware that attempting to send a value when there are no receivers will
/// return an error.
///
/// [`watch`]: crate::sync::watch
/// [`watch::channel`]: crate::sync::watch
///
/// # Examples
/// ```
/// let sender = tokio::sync::watch::Sender::new(0u8);
/// assert!(sender.send(3).is_err());
/// let _rec = sender.subscribe();
/// assert!(sender.send(4).is_ok());
/// ```
pub fn new(init: T) -> Self {
let (tx, _) = channel(init);
tx
}

/// Sends a new value via the channel, notifying all receivers.
///
/// This method fails if the channel is closed, which is the case when
Expand Down
12 changes: 6 additions & 6 deletions tokio/tests/sync_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ fn rx_version_underflow() {
let (_tx, mut rx) = watch::channel("one");

// Version starts at 2, validate we do not underflow
rx.mark_unseen();
rx.mark_unseen();
rx.mark_changed();
rx.mark_changed();
}

#[test]
fn rx_mark_unseen() {
fn rx_mark_changed() {
let (tx, mut rx) = watch::channel("one");

let mut rx2 = rx.clone();
let mut rx3 = rx.clone();
let mut rx4 = rx.clone();
{
rx.mark_unseen();
rx.mark_changed();
assert!(rx.has_changed().unwrap());

let mut t = spawn(rx.changed());
Expand All @@ -76,7 +76,7 @@ fn rx_mark_unseen() {
}

{
rx3.mark_unseen();
rx3.mark_changed();
assert_eq!(*rx3.borrow(), "one");

assert!(rx3.has_changed().unwrap());
Expand All @@ -94,7 +94,7 @@ fn rx_mark_unseen() {
assert!(rx4.has_changed().unwrap());
assert_eq!(*rx4.borrow_and_update(), "two");

rx4.mark_unseen();
rx4.mark_changed();
assert!(rx4.has_changed().unwrap());
assert_eq!(*rx4.borrow_and_update(), "two")
}
Expand Down

0 comments on commit 06a675c

Please sign in to comment.