From 992ca374db3985f9b029b955e66b7f24fa9338dd Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Wed, 26 Apr 2023 18:27:30 -0600 Subject: [PATCH] feat(s2n-quic-core): add sync::worker module (#1728) --- quic/s2n-quic-core/src/sync.rs | 6 + .../src/sync/{spsc => }/primitive.rs | 13 +- quic/s2n-quic-core/src/sync/spsc.rs | 1 - quic/s2n-quic-core/src/sync/spsc/recv.rs | 54 ++++- quic/s2n-quic-core/src/sync/spsc/send.rs | 37 ++- quic/s2n-quic-core/src/sync/spsc/state.rs | 6 +- quic/s2n-quic-core/src/sync/spsc/tests.rs | 14 +- quic/s2n-quic-core/src/sync/worker.rs | 217 ++++++++++++++++++ quic/s2n-quic-core/src/testing.rs | 16 ++ 9 files changed, 338 insertions(+), 26 deletions(-) rename quic/s2n-quic-core/src/sync/{spsc => }/primitive.rs (81%) create mode 100644 quic/s2n-quic-core/src/sync/worker.rs diff --git a/quic/s2n-quic-core/src/sync.rs b/quic/s2n-quic-core/src/sync.rs index 8881093b07..4cc91952bc 100644 --- a/quic/s2n-quic-core/src/sync.rs +++ b/quic/s2n-quic-core/src/sync.rs @@ -1,5 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +#[cfg(feature = "alloc")] +mod primitive; + #[cfg(feature = "alloc")] pub mod spsc; + +#[cfg(feature = "alloc")] +pub mod worker; diff --git a/quic/s2n-quic-core/src/sync/spsc/primitive.rs b/quic/s2n-quic-core/src/sync/primitive.rs similarity index 81% rename from quic/s2n-quic-core/src/sync/spsc/primitive.rs rename to quic/s2n-quic-core/src/sync/primitive.rs index 1a6453229b..ebe08e86a9 100644 --- a/quic/s2n-quic-core/src/sync/spsc/primitive.rs +++ b/quic/s2n-quic-core/src/sync/primitive.rs @@ -2,11 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 #[cfg(all(loom, test))] -mod loom { +mod loom_primitive { use ::core::task::Waker; use ::loom::future::AtomicWaker as Inner; - pub use ::loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + pub use ::loom::sync::{atomic::*, Arc}; #[derive(Debug, Default)] pub struct AtomicWaker(Inner); @@ -31,15 +31,16 @@ mod loom { } #[cfg(all(loom, test))] -pub use self::loom::*; +pub use self::loom_primitive::*; -mod core { - pub use ::core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +mod core_primitive { + pub use ::core::sync::atomic::*; + pub use alloc::sync::Arc; pub use atomic_waker::AtomicWaker; } #[cfg(not(all(loom, test)))] -pub use self::core::*; +pub use self::core_primitive::*; /// Indicates if the type is a zero-sized type /// diff --git a/quic/s2n-quic-core/src/sync/spsc.rs b/quic/s2n-quic-core/src/sync/spsc.rs index 7527471708..c9e0eac7c6 100644 --- a/quic/s2n-quic-core/src/sync/spsc.rs +++ b/quic/s2n-quic-core/src/sync/spsc.rs @@ -1,7 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -mod primitive; mod recv; mod send; mod slice; diff --git a/quic/s2n-quic-core/src/sync/spsc/recv.rs b/quic/s2n-quic-core/src/sync/spsc/recv.rs index 175916c88d..3e6db823f0 100644 --- a/quic/s2n-quic-core/src/sync/spsc/recv.rs +++ b/quic/s2n-quic-core/src/sync/spsc/recv.rs @@ -2,7 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use super::{state::Side, Cursor, Result, State}; -use core::task::{Context, Poll}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; pub struct Receiver(pub(super) State); @@ -27,6 +31,21 @@ impl Receiver { self.0.cursor.is_full() } + /// Returns the currently acquired slice of entries for the receiver + /// + /// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method. + #[inline] + pub fn slice(&mut self) -> RecvSlice { + let cursor = self.0.cursor; + RecvSlice(&mut self.0, cursor) + } + + /// Blocks until at least one entry is available for consumption + #[inline] + pub async fn acquire(&mut self) -> Result<()> { + Acquire { receiver: self }.await + } + #[inline] pub fn poll_slice(&mut self, cx: &mut Context) -> Poll>> { macro_rules! acquire_filled { @@ -124,6 +143,23 @@ impl<'a, T> RecvSlice<'a, T> { len } + /// Releases `len` entries back to the sender + #[inline] + pub fn release(&mut self, len: usize) { + let (pair, _) = self.0.as_pairs(); + + debug_assert!(pair.len() >= len, "cannot release more than was acquired"); + + for entry in pair.iter().take(len) { + unsafe { + // Safety: the state's cursor indicates that each slot in the `iter` contains data + let _ = entry.take(); + } + } + + self.0.cursor.increment_head(len); + } + #[inline] pub fn len(&self) -> usize { self.0.cursor.recv_len() @@ -141,3 +177,19 @@ impl<'a, T> Drop for RecvSlice<'a, T> { self.0.persist_head(self.1); } } + +struct Acquire<'a, T> { + receiver: &'a mut Receiver, +} + +impl<'a, T> Future for Acquire<'a, T> { + type Output = Result<()>; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.receiver.poll_slice(cx) { + Poll::Ready(v) => Poll::Ready(v.map(|_| ())), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/quic/s2n-quic-core/src/sync/spsc/send.rs b/quic/s2n-quic-core/src/sync/spsc/send.rs index bfa549544c..108f160601 100644 --- a/quic/s2n-quic-core/src/sync/spsc/send.rs +++ b/quic/s2n-quic-core/src/sync/spsc/send.rs @@ -2,7 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use super::{state::Side, Cursor, PushError, Result, State}; -use core::task::{Context, Poll}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; pub struct Sender(pub(super) State); @@ -12,6 +16,21 @@ impl Sender { self.0.cursor.capacity() } + /// Returns the currently acquired slice of entries for the sender + /// + /// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method. + #[inline] + pub fn slice(&mut self) -> SendSlice { + let cursor = self.0.cursor; + SendSlice(&mut self.0, cursor) + } + + /// Blocks until at least one entry is available for sending + #[inline] + pub async fn acquire(&mut self) -> Result<()> { + Acquire { sender: self }.await + } + #[inline] pub fn poll_slice(&mut self, cx: &mut Context) -> Poll>> { macro_rules! acquire_capacity { @@ -121,3 +140,19 @@ impl<'a, T> Drop for SendSlice<'a, T> { self.0.persist_tail(self.1); } } + +struct Acquire<'a, T> { + sender: &'a mut Sender, +} + +impl<'a, T> Future for Acquire<'a, T> { + type Output = Result<()>; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.sender.poll_slice(cx) { + Poll::Ready(v) => Poll::Ready(v.map(|_| ())), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/quic/s2n-quic-core/src/sync/spsc/state.rs b/quic/s2n-quic-core/src/sync/spsc/state.rs index d1072d1ec8..fc34c2e7c3 100644 --- a/quic/s2n-quic-core/src/sync/spsc/state.rs +++ b/quic/s2n-quic-core/src/sync/spsc/state.rs @@ -1,10 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::{ - primitive::{AtomicBool, AtomicUsize, AtomicWaker, IsZst, Ordering}, - Cell, ClosedError, Result, Slice, -}; +use super::{Cell, ClosedError, Result, Slice}; +use crate::sync::primitive::{AtomicBool, AtomicUsize, AtomicWaker, IsZst, Ordering}; use alloc::alloc::Layout; use cache_padded::CachePadded; use core::{ diff --git a/quic/s2n-quic-core/src/sync/spsc/tests.rs b/quic/s2n-quic-core/src/sync/spsc/tests.rs index d63dbdc5bf..0bac2ff903 100644 --- a/quic/s2n-quic-core/src/sync/spsc/tests.rs +++ b/quic/s2n-quic-core/src/sync/spsc/tests.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; +use crate::testing::loom; use bolero::{check, generator::*}; use core::task::{Context, Poll, Waker}; use futures_test::task::{new_count_waker, AwokenCount}; @@ -306,19 +307,6 @@ fn alloc_test() { }) } -#[cfg(not(loom))] -mod loom { - pub use std::*; - - pub mod future { - pub use futures::executor::block_on; - } - - pub fn model R, R>(f: F) -> R { - f() - } -} - const CAPACITY: usize = if cfg!(loom) { 2 } else { 10 }; const BATCH_COUNT: usize = if cfg!(loom) { 2 } else { 100 }; const BATCH_SIZE: usize = if cfg!(loom) { 3 } else { 20 }; diff --git a/quic/s2n-quic-core/src/sync/worker.rs b/quic/s2n-quic-core/src/sync/worker.rs new file mode 100644 index 0000000000..54d1c60107 --- /dev/null +++ b/quic/s2n-quic-core/src/sync/worker.rs @@ -0,0 +1,217 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::sync::primitive::{Arc, AtomicUsize, AtomicWaker, Ordering}; +use cache_padded::CachePadded; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Creates a worker channel with a Sender and Receiver +pub fn channel() -> (Sender, Receiver) { + let state = Arc::new(State::default()); + let sender = Sender(state.clone()); + let receiver = Receiver { state, credits: 0 }; + (sender, receiver) +} + +/// A handle to the receiver side of the worker channel +/// +/// This handle is used by the worker to wake up when there is work to do. +pub struct Receiver { + state: Arc, + credits: usize, +} + +impl Receiver { + /// Acquires work to be processed for the Receiver + /// + /// `None` is returned when there are no more active Senders. + #[inline] + pub async fn acquire(&mut self) -> Option { + Acquire(self).await + } + + /// Polls work to be processed for the receiver + /// + /// `None` is returned when there are no more active Senders. + #[inline] + pub fn poll_acquire(&mut self, cx: &mut Context) -> Poll> { + let state = &*self.state; + + macro_rules! acquire { + () => {{ + // take the credits that we've been given by the senders + self.credits += state.remaining.swap(0, Ordering::Acquire); + + // if we have any credits then return + if self.credits > 0 { + return Poll::Ready(Some(self.credits)); + } + }}; + } + + // first try to acquire credits + acquire!(); + + // if we didn't get any credits then register the waker + state.receiver.register(cx.waker()); + + // make one last effort to acquire credits in case a sender submitted some while we were + // registering the waker + acquire!(); + + // If we're the only ones with a handle to the state then we're done + if state.senders.load(Ordering::Acquire) == 0 { + return Poll::Ready(None); + } + + Poll::Pending + } + + /// Marks `count` jobs as finished + #[inline] + pub fn finish(&mut self, count: usize) { + debug_assert!(self.credits >= count); + // decrement the number of credits we have + self.credits -= count; + } +} + +/// A handle to submit work to be done to a worker receiver +/// +/// Multiple Sender handles can be created with `.clone()`. +#[derive(Clone)] +pub struct Sender(Arc); + +impl Sender { + /// Submits `count` jobs to be executed by the worker receiver + #[inline] + pub fn submit(&self, count: usize) { + let state = &*self.0; + + // increment the work counter + state.remaining.fetch_add(count, Ordering::Release); + + // wake up the receiver if possible + state.receiver.wake(); + } +} + +impl Drop for Sender { + #[inline] + fn drop(&mut self) { + let state = &*self.0; + + state.senders.fetch_sub(1, Ordering::Release); + + // wake up the receiver to notify that one of the senders has dropped + state.receiver.wake(); + } +} + +struct State { + remaining: CachePadded, + receiver: AtomicWaker, + senders: CachePadded, +} + +impl Default for State { + fn default() -> Self { + Self { + remaining: Default::default(), + receiver: Default::default(), + senders: AtomicUsize::new(1).into(), + } + } +} + +struct Acquire<'a>(&'a mut Receiver); + +impl<'a> Future for Acquire<'a> { + type Output = Option; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.0.poll_acquire(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing::loom; + + fn loom_scenario(iterations: usize, send_batch_size: usize, recv_batch_size: usize) { + assert_ne!(send_batch_size, 0); + assert_ne!(recv_batch_size, 0); + + loom::model(move || { + let (send, mut recv) = channel(); + + let sender = loom::thread::spawn(move || { + for _ in 0..iterations { + send.submit(send_batch_size); + loom::hint::spin_loop(); + } + }); + + let receiver = loom::thread::spawn(move || { + loom::future::block_on(async move { + let mut total = 0; + while let Some(mut count) = recv.acquire().await { + assert_ne!(count, 0); + + while count > 0 { + let to_finish = count.min(recv_batch_size); + recv.finish(to_finish); + total += to_finish; + count -= to_finish; + } + } + + assert_eq!(total, iterations * send_batch_size); + }) + }); + + // loom tests will still run after returning so we don't need to join + if cfg!(not(loom)) { + sender.join().unwrap(); + receiver.join().unwrap(); + } + }); + } + + /// Async loom tests seem to spin forever if the number of iterations is higher than 1. + /// Ideally, this value would be a bit bigger to test more permutations of orderings. + const ITERATIONS: usize = if cfg!(loom) { 1 } else { 100 }; + const SEND_BATCH_SIZE: usize = if cfg!(loom) { 2 } else { 8 }; + const RECV_BATCH_SIZE: usize = if cfg!(loom) { 2 } else { 8 }; + + #[test] + fn loom_no_items() { + loom_scenario(0, 1, 1); + } + + #[test] + fn loom_single_item() { + loom_scenario(ITERATIONS, 1, 1); + } + + #[test] + fn loom_send_batch() { + loom_scenario(ITERATIONS, SEND_BATCH_SIZE, 1); + } + + #[test] + fn loom_recv_batch() { + loom_scenario(ITERATIONS, 1, RECV_BATCH_SIZE); + } + + #[test] + fn loom_both_batch() { + loom_scenario(ITERATIONS, SEND_BATCH_SIZE, RECV_BATCH_SIZE); + } +} diff --git a/quic/s2n-quic-core/src/testing.rs b/quic/s2n-quic-core/src/testing.rs index 9cdba7606c..fcfe6f5eea 100644 --- a/quic/s2n-quic-core/src/testing.rs +++ b/quic/s2n-quic-core/src/testing.rs @@ -24,3 +24,19 @@ impl core::ops::DerefMut for InlineVec { &mut self.values[..self.len] } } + +#[cfg(all(test, not(loom)))] +pub mod loom { + pub use std::*; + + pub mod future { + pub use futures::executor::block_on; + } + + pub fn model R, R>(f: F) -> R { + f() + } +} + +#[cfg(all(test, loom))] +pub use loom;