Skip to content

Commit

Permalink
feat(s2n-quic-core): add spsc channel (#1614)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Feb 3, 2023
1 parent ef0f3db commit a1e7606
Show file tree
Hide file tree
Showing 24 changed files with 1,829 additions and 70 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,30 @@ jobs:
name: "dhat / report"
status: "success"
url: "${{ steps.s3.outputs.URL }}"

loom:
runs-on: ubuntu-latest
strategy:
matrix:
crate: [quic/s2n-quic-core]
steps:
- uses: actions/checkout@v3
with:
submodules: true

- uses: actions-rs/[email protected]
id: toolchain
with:
toolchain: stable
profile: minimal
override: true

- uses: camshaft/rust-cache@v1
with:
key: ${{ matrix.crate }}

- name: ${{ matrix.crate }}
# run the tests with release mode since some of the loom models can be expensive
run: cd ${{ matrix.crate }} && cargo test --release loom
env:
RUSTFLAGS: --cfg loom -Cdebug-assertions
1 change: 1 addition & 0 deletions quic/s2n-quic-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false

[dependencies]
criterion = { version = "0.4", features = ["html_reports"] }
crossbeam-channel = { version = "0.5" }
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
s2n-quic-core = { path = "../s2n-quic-core", features = ["testing"] }
s2n-quic-crypto = { path = "../s2n-quic-crypto", features = ["testing"] }
Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ mod buffer;
mod crypto;
mod frame;
mod packet;
mod sync;
mod varint;

pub fn benchmarks(c: &mut Criterion) {
buffer::benchmarks(c);
crypto::benchmarks(c);
frame::benchmarks(c);
packet::benchmarks(c);
sync::benchmarks(c);
varint::benchmarks(c);
}
80 changes: 80 additions & 0 deletions quic/s2n-quic-bench/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use criterion::{BenchmarkId, Criterion, Throughput};
use crossbeam_channel::bounded;
use s2n_quic_core::sync::spsc;

pub fn benchmarks(c: &mut Criterion) {
spsc_benches(c);
}

fn spsc_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("spsc");

for i in [1, 64, 1024, 4096] {
group.throughput(Throughput::Elements(i as _));
group.bench_with_input(BenchmarkId::new("s2n/send_recv", i), &i, |b, input| {
let (mut sender, mut receiver) = spsc::channel(*input);
b.iter(|| {
{
let mut slice = sender.try_slice().unwrap().unwrap();
while slice.push(123usize).is_ok() {}
}

{
let mut slice = receiver.try_slice().unwrap().unwrap();
while slice.pop().is_some() {}
}
});
});
group.bench_with_input(
BenchmarkId::new("crossbeam/send_recv", i),
&i,
|b, input| {
let (sender, receiver) = bounded(*input);
b.iter(|| {
{
while sender.try_send(123usize).is_ok() {}
}

{
while receiver.try_recv().is_ok() {}
}
});
},
);

group.bench_with_input(BenchmarkId::new("s2n/send_recv_iter", i), &i, |b, input| {
let (mut sender, mut receiver) = spsc::channel(*input);
b.iter(|| {
{
let mut slice = sender.try_slice().unwrap().unwrap();
let _ = slice.extend(&mut core::iter::repeat(123usize));
}

{
let mut slice = receiver.try_slice().unwrap().unwrap();
slice.clear();
}
});
});
group.bench_with_input(
BenchmarkId::new("crossbeam/send_recv_iter", i),
&i,
|b, input| {
let (sender, receiver) = bounded(*input);
b.iter(|| {
{
while sender.try_send(123usize).is_ok() {}
}

{
for _ in receiver.try_iter() {}
}
});
},
);
}
group.finish();
}
8 changes: 7 additions & 1 deletion quic/s2n-quic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ exclude = ["corpus.tar.gz"]

[features]
default = ["alloc", "std"]
alloc = []
alloc = ["atomic-waker", "cache-padded"]
std = ["alloc", "once_cell"]
testing = ["std", "generator", "s2n-codec/testing", "checked-counters", "insta", "futures-test"]
generator = ["bolero-generator"]
checked-counters = []
event-tracing = ["tracing"]

[dependencies]
atomic-waker = { version = "1", optional = true }
bolero-generator = { version = "0.8", default-features = false, optional = true }
byteorder = { version = "1", default-features = false }
bytes = { version = "1", default-features = false }
cache-padded = { version = "1", optional = true }
hex-literal = "0.3"
# used for event snapshot testing - needs an internal API so we require a minimum version
insta = { version = ">=1.12", features = ["json"], optional = true }
Expand All @@ -40,7 +42,11 @@ once_cell = { version = "1", optional = true }
bolero = "0.8"
bolero-generator = { version = "0.8", default-features = false }
insta = { version = "1", features = ["json"] }
futures = "0.3"
futures-test = "0.3"
ip_network = "0.4"
plotters = { version = "0.3", default-features = false, features = ["svg_backend", "line_series"] }
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["checkpoint", "futures"] }
28 changes: 28 additions & 0 deletions quic/s2n-quic-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@
#[cfg(feature = "alloc")]
extern crate alloc;

/// Asserts that a boolean expression is true at runtime, only if debug_assertions are enabled.
///
/// Otherwise, the compiler is told to assume that the expression is always true and can perform
/// additional optimizations.
///
/// # Safety
///
/// The caller _must_ ensure this condition is never possible, otherwise the compiler
/// may optimize based on false assumptions and behave incorrectly.
#[macro_export]
macro_rules! assume {
($cond:expr) => {
$crate::assume!($cond, "assumption failed: {}", stringify!($cond));
};
($cond:expr $(, $fmtarg:expr)* $(,)?) => {
let v = $cond;

debug_assert!(v $(, $fmtarg)*);
if cfg!(not(debug_assertions)) && !v {
core::hint::unreachable_unchecked();
}
};
}

pub mod ack;
pub mod application;
#[cfg(feature = "alloc")]
Expand Down Expand Up @@ -33,8 +57,12 @@ pub mod recovery;
pub mod slice;
pub mod stateless_reset;
pub mod stream;
pub mod sync;
pub mod time;
pub mod token;
pub mod transmission;
pub mod transport;
pub mod varint;

#[cfg(any(test, feature = "testing"))]
pub mod testing;
25 changes: 2 additions & 23 deletions quic/s2n-quic-core/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
use bolero::{check, generator::*};
use crate::testing::InlineVec;
use bolero::check;

fn assert_eq_slices<A, B, T>(a: &[A], b: &[B])
where
Expand Down Expand Up @@ -128,28 +129,6 @@ mod tests {
}
}

#[derive(Clone, Copy, Debug, TypeGenerator)]
struct InlineVec<T, const LEN: usize> {
values: [T; LEN],

#[generator(_code = "0..LEN")]
len: usize,
}

impl<T, const LEN: usize> core::ops::Deref for InlineVec<T, LEN> {
type Target = [T];

fn deref(&self) -> &Self::Target {
&self.values[..self.len]
}
}

impl<T, const LEN: usize> core::ops::DerefMut for InlineVec<T, LEN> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.values[..self.len]
}
}

const LEN: usize = if cfg!(kani) { 2 } else { 32 };

#[test]
Expand Down
5 changes: 5 additions & 0 deletions quic/s2n-quic-core/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "alloc")]
pub mod spsc;
Git LFS file not shown
42 changes: 42 additions & 0 deletions quic/s2n-quic-core/src/sync/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

mod primitive;
mod recv;
mod send;
mod slice;
mod state;

use slice::*;
use state::*;

pub use recv::{Receiver, RecvSlice};
pub use send::{SendSlice, Sender};

#[inline]
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let state = State::new(capacity);
let sender = Sender(state.clone());
let receiver = Receiver(state);
(sender, receiver)
}

#[cfg(test)]
mod tests;

type Result<T, E = ClosedError> = core::result::Result<T, E>;

#[derive(Clone, Copy, Debug)]
pub struct ClosedError;

#[derive(Clone, Copy, Debug)]
pub enum PushError<T> {
Full(T),
Closed,
}

impl<T> From<ClosedError> for PushError<T> {
fn from(_error: ClosedError) -> Self {
Self::Closed
}
}
53 changes: 53 additions & 0 deletions quic/s2n-quic-core/src/sync/spsc/primitive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[cfg(all(loom, test))]
mod loom {
use ::core::task::Waker;
use ::loom::future::AtomicWaker as Inner;

pub use ::loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

#[derive(Debug, Default)]
pub struct AtomicWaker(Inner);

impl AtomicWaker {
pub fn new() -> Self {
Self(Inner::new())
}

pub fn wake(&self) {
self.0.wake();
}

pub fn take(&self) -> Option<Waker> {
self.0.take_waker()
}

pub fn register(&self, waker: &Waker) {
self.0.register_by_ref(waker);
}
}
}

#[cfg(all(loom, test))]
pub use self::loom::*;

mod core {
pub use ::core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub use atomic_waker::AtomicWaker;
}

#[cfg(not(all(loom, test)))]
pub use self::core::*;

/// Indicates if the type is a zero-sized type
///
/// This can be used to optimize the code to avoid needless calculations.
pub trait IsZst {
const IS_ZST: bool;
}

impl<T> IsZst for T {
const IS_ZST: bool = ::core::mem::size_of::<T>() == 0;
}
Loading

0 comments on commit a1e7606

Please sign in to comment.