Skip to content

Commit

Permalink
feat(s2n-quic-core): add sync::worker module (#1728)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Apr 27, 2023
1 parent 61220a3 commit 992ca37
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 26 deletions.
6 changes: 6 additions & 0 deletions quic/s2n-quic-core/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "alloc")]
mod primitive;

#[cfg(feature = "alloc")]
pub mod spsc;

#[cfg(feature = "alloc")]
pub mod worker;
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

#[cfg(all(loom, test))]
mod loom {
mod loom_primitive {
use ::core::task::Waker;
use ::loom::future::AtomicWaker as Inner;

pub use ::loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub use ::loom::sync::{atomic::*, Arc};

#[derive(Debug, Default)]
pub struct AtomicWaker(Inner);
Expand All @@ -31,15 +31,16 @@ mod loom {
}

#[cfg(all(loom, test))]
pub use self::loom::*;
pub use self::loom_primitive::*;

mod core {
pub use ::core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
mod core_primitive {
pub use ::core::sync::atomic::*;
pub use alloc::sync::Arc;
pub use atomic_waker::AtomicWaker;
}

#[cfg(not(all(loom, test)))]
pub use self::core::*;
pub use self::core_primitive::*;

/// Indicates if the type is a zero-sized type
///
Expand Down
1 change: 0 additions & 1 deletion quic/s2n-quic-core/src/sync/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

mod primitive;
mod recv;
mod send;
mod slice;
Expand Down
54 changes: 53 additions & 1 deletion quic/s2n-quic-core/src/sync/spsc/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use super::{state::Side, Cursor, Result, State};
use core::task::{Context, Poll};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub struct Receiver<T>(pub(super) State<T>);

Expand All @@ -27,6 +31,21 @@ impl<T> Receiver<T> {
self.0.cursor.is_full()
}

/// Returns the currently acquired slice of entries for the receiver
///
/// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method.
#[inline]
pub fn slice(&mut self) -> RecvSlice<T> {
let cursor = self.0.cursor;
RecvSlice(&mut self.0, cursor)
}

/// Blocks until at least one entry is available for consumption
#[inline]
pub async fn acquire(&mut self) -> Result<()> {
Acquire { receiver: self }.await
}

#[inline]
pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<RecvSlice<T>>> {
macro_rules! acquire_filled {
Expand Down Expand Up @@ -124,6 +143,23 @@ impl<'a, T> RecvSlice<'a, T> {
len
}

/// Releases `len` entries back to the sender
#[inline]
pub fn release(&mut self, len: usize) {
let (pair, _) = self.0.as_pairs();

debug_assert!(pair.len() >= len, "cannot release more than was acquired");

for entry in pair.iter().take(len) {
unsafe {
// Safety: the state's cursor indicates that each slot in the `iter` contains data
let _ = entry.take();
}
}

self.0.cursor.increment_head(len);
}

#[inline]
pub fn len(&self) -> usize {
self.0.cursor.recv_len()
Expand All @@ -141,3 +177,19 @@ impl<'a, T> Drop for RecvSlice<'a, T> {
self.0.persist_head(self.1);
}
}

struct Acquire<'a, T> {
receiver: &'a mut Receiver<T>,
}

impl<'a, T> Future for Acquire<'a, T> {
type Output = Result<()>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.receiver.poll_slice(cx) {
Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
Poll::Pending => Poll::Pending,
}
}
}
37 changes: 36 additions & 1 deletion quic/s2n-quic-core/src/sync/spsc/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use super::{state::Side, Cursor, PushError, Result, State};
use core::task::{Context, Poll};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub struct Sender<T>(pub(super) State<T>);

Expand All @@ -12,6 +16,21 @@ impl<T> Sender<T> {
self.0.cursor.capacity()
}

/// Returns the currently acquired slice of entries for the sender
///
/// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method.
#[inline]
pub fn slice(&mut self) -> SendSlice<T> {
let cursor = self.0.cursor;
SendSlice(&mut self.0, cursor)
}

/// Blocks until at least one entry is available for sending
#[inline]
pub async fn acquire(&mut self) -> Result<()> {
Acquire { sender: self }.await
}

#[inline]
pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<SendSlice<T>>> {
macro_rules! acquire_capacity {
Expand Down Expand Up @@ -121,3 +140,19 @@ impl<'a, T> Drop for SendSlice<'a, T> {
self.0.persist_tail(self.1);
}
}

struct Acquire<'a, T> {
sender: &'a mut Sender<T>,
}

impl<'a, T> Future for Acquire<'a, T> {
type Output = Result<()>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.sender.poll_slice(cx) {
Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
Poll::Pending => Poll::Pending,
}
}
}
6 changes: 2 additions & 4 deletions quic/s2n-quic-core/src/sync/spsc/state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{
primitive::{AtomicBool, AtomicUsize, AtomicWaker, IsZst, Ordering},
Cell, ClosedError, Result, Slice,
};
use super::{Cell, ClosedError, Result, Slice};
use crate::sync::primitive::{AtomicBool, AtomicUsize, AtomicWaker, IsZst, Ordering};
use alloc::alloc::Layout;
use cache_padded::CachePadded;
use core::{
Expand Down
14 changes: 1 addition & 13 deletions quic/s2n-quic-core/src/sync/spsc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use crate::testing::loom;
use bolero::{check, generator::*};
use core::task::{Context, Poll, Waker};
use futures_test::task::{new_count_waker, AwokenCount};
Expand Down Expand Up @@ -306,19 +307,6 @@ fn alloc_test() {
})
}

#[cfg(not(loom))]
mod loom {
pub use std::*;

pub mod future {
pub use futures::executor::block_on;
}

pub fn model<F: FnOnce() -> R, R>(f: F) -> R {
f()
}
}

const CAPACITY: usize = if cfg!(loom) { 2 } else { 10 };
const BATCH_COUNT: usize = if cfg!(loom) { 2 } else { 100 };
const BATCH_SIZE: usize = if cfg!(loom) { 3 } else { 20 };
Expand Down
Loading

0 comments on commit 992ca37

Please sign in to comment.