From 2abfdf0d9e2dd6a55a0b17037fbc6acd73acf1b2 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Wed, 26 Apr 2023 15:54:36 -0600 Subject: [PATCH 1/5] feat(s2n-quic-xdp): add async tasks --- tools/xdp/s2n-quic-xdp/Cargo.toml | 6 + tools/xdp/s2n-quic-xdp/src/if_xdp.rs | 21 + tools/xdp/s2n-quic-xdp/src/lib.rs | 2 + tools/xdp/s2n-quic-xdp/src/ring.rs | 108 +++++ tools/xdp/s2n-quic-xdp/src/ring/cursor.rs | 23 +- tools/xdp/s2n-quic-xdp/src/socket.rs | 17 +- tools/xdp/s2n-quic-xdp/src/task.rs | 37 ++ .../s2n-quic-xdp/src/task/completion_to_tx.rs | 456 ++++++++++++++++++ .../src/task/completion_to_tx/assign.rs | 240 +++++++++ tools/xdp/s2n-quic-xdp/src/task/rx.rs | 407 ++++++++++++++++ tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs | 298 ++++++++++++ tools/xdp/s2n-quic-xdp/src/task/testing.rs | 24 + tools/xdp/s2n-quic-xdp/src/task/tx.rs | 195 ++++++++ 13 files changed, 1827 insertions(+), 7 deletions(-) create mode 100644 tools/xdp/s2n-quic-xdp/src/task.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/completion_to_tx/assign.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/rx.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/testing.rs create mode 100644 tools/xdp/s2n-quic-xdp/src/task/tx.rs diff --git a/tools/xdp/s2n-quic-xdp/Cargo.toml b/tools/xdp/s2n-quic-xdp/Cargo.toml index 2271ac0479..3fd0e06978 100644 --- a/tools/xdp/s2n-quic-xdp/Cargo.toml +++ b/tools/xdp/s2n-quic-xdp/Cargo.toml @@ -3,11 +3,17 @@ name = "s2n-quic-xdp" version = "0.1.0" edition = "2021" +[features] +default = ["tokio"] + [dependencies] bitflags = "2" errno = "0.3" libc = "0.2" s2n-quic-core = { path = "../../../quic/s2n-quic-core", version = "0.19" } +tokio = { version = "1", optional = true } [dev-dependencies] bolero = "0.9" +rand = "0.8" +tokio = { version = "1", features = ["full"] } diff --git a/tools/xdp/s2n-quic-xdp/src/if_xdp.rs b/tools/xdp/s2n-quic-xdp/src/if_xdp.rs index cc399f1ae0..77d00774bb 100644 --- a/tools/xdp/s2n-quic-xdp/src/if_xdp.rs +++ b/tools/xdp/s2n-quic-xdp/src/if_xdp.rs @@ -292,3 +292,24 @@ pub struct UmemDescriptor { /// Offset into the umem where the packet starts pub address: u64, } + +impl UmemDescriptor { + /// Sets the length for the descriptor and converts it into a [`RxTxDescriptor`] + #[inline] + pub fn with_len(self, len: u32) -> RxTxDescriptor { + RxTxDescriptor { + address: self.address, + len, + options: 0, + } + } +} + +impl From for UmemDescriptor { + #[inline] + fn from(desc: RxTxDescriptor) -> Self { + Self { + address: desc.address, + } + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/lib.rs b/tools/xdp/s2n-quic-xdp/src/lib.rs index 852bb25554..3320d27648 100644 --- a/tools/xdp/s2n-quic-xdp/src/lib.rs +++ b/tools/xdp/s2n-quic-xdp/src/lib.rs @@ -15,5 +15,7 @@ mod ring; mod socket; /// Helpers for making API calls to AF-XDP sockets mod syscall; +/// A set of async tasks responsible for managing ring buffer and queue state +mod task; /// A shared region of memory for holding frame (packet) data mod umem; diff --git a/tools/xdp/s2n-quic-xdp/src/ring.rs b/tools/xdp/s2n-quic-xdp/src/ring.rs index 9f9850571d..d79d55e66c 100644 --- a/tools/xdp/s2n-quic-xdp/src/ring.rs +++ b/tools/xdp/s2n-quic-xdp/src/ring.rs @@ -16,6 +16,10 @@ use cursor::Cursor; #[derive(Debug)] struct Ring { cursor: Cursor, + // make the area clonable in test mode + #[cfg(test)] + area: std::sync::Arc, + #[cfg(not(test))] area: Mmap, socket: socket::Fd, } @@ -53,6 +57,9 @@ macro_rules! impl_producer { cursor.init_producer(); } + #[cfg(test)] + let area = std::sync::Arc::new(area); + Ok(Self(Ring { cursor, area, @@ -95,6 +102,12 @@ macro_rules! impl_producer { self.0.cursor.producer_data() } } + + /// Returns the overall size of the ring + #[inline] + pub fn capacity(&self) -> usize { + self.0.cursor.capacity() as _ + } }; } @@ -120,6 +133,9 @@ macro_rules! impl_consumer { Cursor::new(&area, offsets, size) }; + #[cfg(test)] + let area = std::sync::Arc::new(area); + Ok(Self(Ring { cursor, area, @@ -156,10 +172,22 @@ macro_rules! impl_consumer { self.0.cursor.consumer_data() } } + + /// Returns the overall size of the ring + #[inline] + pub fn capacity(&self) -> usize { + self.0.cursor.capacity() as _ + } + + #[cfg(test)] + pub fn set_flags(&mut self, flags: crate::if_xdp::RingFlags) { + *self.0.cursor.flags_mut() = flags; + } }; } /// A transmission ring for entries to be transmitted +#[derive(Debug)] pub struct Tx(Ring); impl Tx { @@ -167,6 +195,7 @@ impl Tx { } /// A receive ring for entries to be processed +#[derive(Debug)] pub struct Rx(Ring); impl Rx { @@ -174,6 +203,7 @@ impl Rx { } /// The fill ring for entries to be populated +#[derive(Debug)] pub struct Fill(Ring); impl Fill { @@ -181,6 +211,7 @@ impl Fill { } /// The completion ring for entries to be reused for transmission +#[derive(Debug)] pub struct Completion(Ring); impl Completion { @@ -191,3 +222,80 @@ impl Completion { COMPLETION_RING ); } + +#[cfg(test)] +pub mod testing { + use super::*; + use crate::{if_xdp, socket::Fd}; + + fn offsets() -> if_xdp::RingOffsetV2 { + if_xdp::RingOffsetV2 { + producer: 0, + consumer: core::mem::size_of::() as _, + flags: (core::mem::size_of::() * 2) as _, + desc: (core::mem::size_of::() * 3) as _, + } + } + + macro_rules! impl_pair { + ($name:ident, $consumer:ident, $producer:ident, $T:ident) => { + /// Creates a pair of rings used for testing + pub fn $name(size: u32) -> ($consumer, $producer) { + assert!(size.is_power_of_two()); + + let offsets = offsets(); + + // start with the descriptor offset as the total length + let mut len = offsets.desc as usize; + // extend the length by the `size` multiplied the entry size + len += size as usize * size_of::<$T>(); + + let area = Mmap::new(len, 0, None).unwrap(); + + let consumer_cursor = unsafe { + // Safety: `area` lives as long as `cursor` + Cursor::new(&area, &offsets, size) + }; + + let mut producer_cursor = unsafe { + // Safety: `area` lives as long as `cursor` + Cursor::new(&area, &offsets, size) + }; + + unsafe { + // Safety: this is only called by a producer + producer_cursor.init_producer(); + } + + let area = std::sync::Arc::new(area); + + let cons = $consumer(Ring { + cursor: consumer_cursor, + area: area.clone(), + socket: Fd::invalid(), + }); + + let prod = $producer(Ring { + cursor: producer_cursor, + area, + socket: Fd::invalid(), + }); + + (cons, prod) + } + }; + } + + impl_pair!(rx_tx, Rx, Tx, RxTxDescriptor); + impl_pair!(completion_fill, Completion, Fill, UmemDescriptor); + + #[test] + fn rx_tx_test() { + let _ = rx_tx(16); + } + + #[test] + fn comp_fill_test() { + let _ = completion_fill(16); + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs b/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs index 5af13dc801..aa2feb819f 100644 --- a/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs +++ b/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs @@ -123,6 +123,11 @@ impl Cursor { unsafe { &*self.consumer.as_ptr() } } + /// Returns the overall size of the ring + pub fn capacity(&self) -> u32 { + self.size + } + /// Acquires a cursor index for a producer half /// /// The `watermark` can be provided to avoid synchronization by reusing the cached cursor @@ -138,7 +143,12 @@ impl Cursor { return free; } - let new_value = self.consumer().load(Ordering::Acquire); + let mut new_value = self.consumer().load(Ordering::Acquire); + + // Our cached copy has the size added so we also need to add the size here when comparing + // + // See `Self::init_producer` for more details + new_value += self.size; if self.cached_consumer.0 == new_value { return free; @@ -146,11 +156,6 @@ impl Cursor { self.cached_consumer.0 = new_value; - unsafe { - // Safety: this is called on the producer side - self.init_producer(); - } - self.cached_len = self.cached_producer_len(); self.cached_len @@ -279,6 +284,12 @@ impl Cursor { unsafe { &*self.flags.as_ptr() } } + /// Returns a reference to the flags on the ring + #[inline] + pub fn flags_mut(&mut self) -> &mut RingFlags { + unsafe { &mut *self.flags.as_ptr() } + } + /// Returns the current consumer entries /// /// # Safety diff --git a/tools/xdp/s2n-quic-xdp/src/socket.rs b/tools/xdp/s2n-quic-xdp/src/socket.rs index f7d04bc8df..5a084dae8d 100644 --- a/tools/xdp/s2n-quic-xdp/src/socket.rs +++ b/tools/xdp/s2n-quic-xdp/src/socket.rs @@ -9,7 +9,7 @@ use std::{ }; /// A structure for reference counting an AF-XDP socket -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq)] pub struct Fd(Arc); impl fmt::Debug for Fd { @@ -28,6 +28,20 @@ impl Fd { let fd = Arc::new(Inner(fd)); Ok(Self(fd)) } + + pub fn attach_umem(&self, umem: &crate::umem::Umem) -> Result<()> { + umem.attach(self)?; + // TODO store the umem + Ok(()) + } + + /// Returns an invalid file descriptor + /// + /// This should only be used in tests to avoid creating an actual socket. + #[cfg(test)] + pub fn invalid() -> Self { + Self(Arc::new(Inner(-1))) + } } impl AsRawFd for Fd { @@ -38,6 +52,7 @@ impl AsRawFd for Fd { } /// Wrap the RawFd in a structure that automatically closes the socket on drop +#[derive(PartialEq, Eq)] struct Inner(RawFd); impl Drop for Inner { diff --git a/tools/xdp/s2n-quic-xdp/src/task.rs b/tools/xdp/s2n-quic-xdp/src/task.rs new file mode 100644 index 0000000000..a7bcdc3c6a --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task.rs @@ -0,0 +1,37 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! A set of async tasks responsible for managing ring buffer and queue state +//! +//! Fundamentally, each task takes a set of input sources and routes them to one or more output +//! queues. Each task is generic over the execution environment, meaning it can be using in +//! something driven by polling for events, like `tokio`, or spawned on its own thread in a busy +//! poll loop. +//! +//! The ordering of operations in each of the tasks is critical for correctness. It's very easy to +//! get into a deadlock if things aren't exactly right. As such, each task has a fuzz test that +//! tries to show the tasks working properly, even in extreme cases. + +/// Emits a log line if the `s2n_quic_xdp_trace` cfg option is enabled. Otherwise, the trace is a +/// no-op. +macro_rules! trace { + ($($fmt:tt)*) => {{ + if cfg!(s2n_quic_xdp_trace) { + let args = format!($($fmt)*); + println!("{}:{}: {}", module_path!(), line!(), args); + } + }} +} + +pub mod completion_to_tx; +pub mod rx; +pub mod rx_to_fill; +pub mod tx; + +#[cfg(test)] +mod testing; + +pub use completion_to_tx::completion_to_tx; +pub use rx::rx; +pub use rx_to_fill::rx_to_fill; +pub use tx::tx; diff --git a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs new file mode 100644 index 0000000000..8431bd0b75 --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs @@ -0,0 +1,456 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{if_xdp::UmemDescriptor, ring}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use s2n_quic_core::sync::{spsc, worker}; + +mod assign; + +type Sender = spsc::Sender; + +/// Takes descriptors from the completion queue and forwards it to individual workers +pub async fn completion_to_tx( + poller: P, + comp: ring::Completion, + frame_size: u32, + mut tx_queues: Vec, +) { + for tx_queue in &tx_queues { + assert!( + tx_queue.capacity() >= comp.capacity(), + "tx queues should have at least as much capacity as the completion queue to avoid dropping descriptors" + ); + } + + // create a different future based on the arguments + match ( + tx_queues.len(), + frame_size.is_power_of_two(), + tx_queues.len().is_power_of_two(), + ) { + (0, _, _) => panic!("invalid tx_queues size"), + (1, _, _) => { + trace!("using single queue mode"); + CompletionRingToTx { + txs: tx_queues.pop().unwrap(), + comp, + poller, + assignment: (), + } + .await; + } + (len, true, true) => { + trace!("using fully-aligned mode with {len} queues"); + CompletionRingToTx { + txs: tx_queues, + comp, + poller, + assignment: assign::AssignGeneric { + frame: assign::AlignedFrame::new(frame_size), + index: assign::AlignedQueue::new(len), + }, + } + .await; + } + (len, true, false) => { + trace!("using frame-aligned mode with {len} queues"); + CompletionRingToTx { + txs: tx_queues, + comp, + poller, + assignment: assign::AssignGeneric { + frame: assign::AlignedFrame::new(frame_size), + index: assign::UnalignedQueue::new(len), + }, + } + .await; + } + (len, false, true) => { + trace!("using queue-aligned mode with {len} queues"); + CompletionRingToTx { + txs: tx_queues, + comp, + poller, + assignment: assign::AssignGeneric { + frame: assign::UnalignedFrame::new(frame_size), + index: assign::AlignedQueue::new(len), + }, + } + .await; + } + (len, false, false) => { + trace!("using unaligned mode with {len} queues"); + CompletionRingToTx { + txs: tx_queues, + comp, + poller, + assignment: assign::AssignGeneric { + frame: assign::UnalignedFrame::new(frame_size), + index: assign::UnalignedQueue::new(len), + }, + } + .await; + } + } +} + +/// Polls the completion queue for progress +pub trait Poller: Unpin { + fn poll(&mut self, comp: &mut ring::Completion, cx: &mut Context) -> Poll>; + fn release(&mut self, comp: &mut ring::Completion, count: usize); +} + +impl Poller for () { + #[inline] + fn poll(&mut self, comp: &mut ring::Completion, cx: &mut Context) -> Poll> { + // In this mode we are busy polling so wake ourselves up on every iteration + cx.waker().wake_by_ref(); + + // try to acquire entries from the completion queue + let count = comp.acquire(1); + + trace!("acquired {count} items from the completion queue"); + + if count > 0 { + Poll::Ready(Some(count)) + } else { + Poll::Pending + } + } + + #[inline] + fn release(&mut self, comp: &mut ring::Completion, count: usize) { + trace!("releasing {count} items to the completion queue"); + if count > 0 { + // release the number of consumed items to the completion queue + comp.release(count as _); + } + } +} + +impl Poller for worker::Receiver { + #[inline] + fn poll(&mut self, comp: &mut ring::Completion, cx: &mut Context) -> Poll> { + // try to acquire some work from the producers + let credits = match self.poll_acquire(cx) { + Poll::Ready(Some(count)) => count as u32, + Poll::Ready(None) => { + // there are no producers left so we're closing + return Poll::Ready(None); + } + Poll::Pending => { + // there's no work to be done so yield and wait for a producer to wake us up + return Poll::Pending; + } + }; + + trace!("acquired {credits} worker credits"); + + // acquire entries from the completion queue + let actual = comp.acquire(credits); + trace!("acquired {actual} entries from the completion queue"); + + // just in case there's a race between the work items count and the completion queue we'll + // take the minimum here. + let actual = actual.min(credits); + + // we need to make sure to wake back up so we can query to see if there's work to be done + cx.waker().wake_by_ref(); + + if actual > 0 { + Poll::Ready(Some(actual)) + } else { + Poll::Pending + } + } + + #[inline] + fn release(&mut self, comp: &mut ring::Completion, count: usize) { + trace!("releasing {count} entries to the completion queue"); + + if count > 0 { + // release the number of consumed items to the completion queue + comp.release(count as _); + + // mark `count` number of items as complete + self.finish(count); + } + } +} + +/// A group of TX queues that are responsible for filling packets +trait Txs: Unpin { + /// Iterates over all of the queues in the group + fn for_each(&mut self, f: F); + /// Returns the number of queues + fn len(&self) -> usize; +} + +impl Txs for Vec { + #[inline] + fn for_each(&mut self, mut f: F) { + for s in self.iter_mut() { + f(s); + } + } + + #[inline] + fn len(&self) -> usize { + Vec::len(self) + } +} + +impl Txs for Sender { + #[inline] + fn for_each(&mut self, mut f: F) { + f(self); + } + + #[inline] + fn len(&self) -> usize { + 1 + } +} + +struct CompletionRingToTx { + txs: T, + comp: ring::Completion, + poller: P, + assignment: A, +} + +impl Future for CompletionRingToTx { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + let Self { + txs, + comp, + poller, + assignment, + } = self.get_mut(); + + trace!("polling completion ring to tx"); + + // try to query if we have any ready items + let count = match poller.poll(comp, cx) { + Poll::Ready(Some(count)) => { + // we're ready, keep going + count + } + Poll::Ready(None) => { + // shut down the task + return Poll::Ready(()); + } + Poll::Pending => { + // nothing to do right now + return Poll::Pending; + } + }; + + let (head, tail) = comp.data(); + + let mut sent = 0; + let mut closed = 0; + + let mut idx = 0; + txs.for_each(|tx| { + match tx.try_slice() { + Ok(Some(mut slice)) => { + /// copies the completion items into the worker's queue + macro_rules! extend { + ($name:ident) => { + if !$name.is_empty() { + let mut iter = $name + .iter() + .take(count as _) + .copied() + .filter(|desc| assignment.assign(*desc, idx)) + .map(|desc| { + trace!("assigning address {} to queue {idx}", desc.address); + + sent += 1; + desc + }) + .peekable(); + + while iter.peek().is_some() { + if slice.extend(&mut iter).is_err() { + trace!("tx queue {idx} is closed"); + closed += 1; + idx += 1; + return; + } + } + } + }; + } + + extend!(head); + extend!(tail); + } + Ok(None) => { + unreachable!("tx queue capacity should exceed that of the completion queue"); + } + Err(_) => { + trace!("tx queue {idx} closed"); + closed += 1; + } + } + + idx += 1; + }); + + // let the poller know how many items we consumed + poller.release(comp, sent); + + // if all of the queues are closed then shut down the task + if closed == txs.len() { + trace!("all tx queues closed; shutting down"); + return Poll::Ready(()); + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + if_xdp::UmemDescriptor, + task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + }; + use rand::prelude::*; + use tokio::sync::oneshot; + + async fn execute_test(workers: usize, frame_size: u32) { + let channel_size = QUEUE_SIZE; + let worker_total = TEST_ITEMS / workers; + let expected_total = worker_total * workers; + + let mut worker_channels = vec![]; + let mut worker_done = vec![]; + + for idx in 0..workers { + let (rx_send, mut rx_recv) = spsc::channel::(channel_size); + let (done_send, done_recv) = oneshot::channel(); + + worker_channels.push(rx_send); + worker_done.push(done_recv); + + tokio::spawn(async move { + let mut total = 0; + let mut expected = (idx as u64..) + .step_by(workers) + .map(|v| v * frame_size as u64); + + while rx_recv.acquire().await.is_ok() { + let mut slice = rx_recv.slice(); + + while let Some(entry) = slice.pop() { + trace!("queue {idx} received address {}", entry.address); + + assert_eq!( + entry.address, + expected.next().unwrap(), + "address does not match the expected value" + ); + total += 1; + } + } + + trace!("all queue items for {idx} received; shutting down"); + + done_send.send(total).unwrap(); + }); + } + + let (ring_rx, mut ring_tx) = ring::testing::completion_fill(channel_size as u32); + let (worker_send, worker_recv) = worker::channel(); + + tokio::spawn(completion_to_tx( + worker_recv, + ring_rx, + frame_size, + worker_channels, + )); + + tokio::spawn(async move { + let mut addresses = (0..expected_total as u64) + .map(|address| UmemDescriptor { + address: address * frame_size as u64, + }) + .peekable(); + + let mut total = 0; + + while addresses.peek().is_some() { + let count = ring_tx.acquire(1); + + trace!("acquired {count} TX ring entries"); + + if count == 0 { + random_delay().await; + continue; + } + + let batch_size = thread_rng().gen_range(1..=count); + trace!("TX batch size set to {batch_size}"); + + let (head, tail) = ring_tx.data(); + + let mut sent = 0; + for (desc, dest) in (&mut addresses) + .take(batch_size as _) + .zip(head.iter_mut().chain(tail)) + { + trace!("sending address {}", desc.address); + *dest = desc; + sent += 1; + } + + trace!("sent {sent} items"); + + ring_tx.release(sent as _); + worker_send.submit(sent); + total += sent; + + random_delay().await; + } + + trace!("all items sent; shutting down"); + + assert_eq!(total, expected_total); + }); + + let mut actual_total = 0; + + for done_recv in worker_done { + actual_total += done_recv.await.unwrap(); + } + + assert_eq!(expected_total as u64, actual_total); + } + + #[tokio::test] + async fn single_worker() { + execute_test(1, 4096).await; + } + + #[tokio::test] + async fn multiple_worker_aligned() { + execute_test(4, 16).await; + } + + #[tokio::test] + async fn multiple_worker_unaligned() { + execute_test(4, 17).await; + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx/assign.rs b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx/assign.rs new file mode 100644 index 0000000000..3215cf16ff --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx/assign.rs @@ -0,0 +1,240 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::if_xdp::UmemDescriptor; + +/// Trait to define how descriptors are assigned to TX workers +/// +/// As the Completion ring is global for the entire socket, it is up to the application to decide +/// which TX queues get which descriptors. This trait takes in a descriptor and decides if it +/// pertains to a worker index or not. +pub trait Assign: Unpin { + fn assign(&self, desc: UmemDescriptor, idx: u64) -> bool; +} + +impl Assign for () { + #[inline] + fn assign(&self, _desc: UmemDescriptor, idx: u64) -> bool { + debug_assert_eq!( + idx, 0, + "assignment mode should only be used for single queue workflows" + ); + + // only assign descriptors to the first worker + idx == 0 + } +} + +/// Assignment strategy that is generic over framing and index alignment +pub struct AssignGeneric { + pub frame: F, + pub index: I, +} + +impl Assign for AssignGeneric { + #[inline] + fn assign(&self, desc: UmemDescriptor, idx: u64) -> bool { + let v = self.frame.frame_to_index(desc); + let v = self.index.index_to_queue(v); + v == idx + } +} + +/// Converts a frame address into a frame index +pub trait FrameToIndex: Unpin { + fn frame_to_index(&self, desc: UmemDescriptor) -> u64; +} + +pub struct AlignedFrame { + shift: u32, +} + +impl AlignedFrame { + pub fn new(frame_size: u32) -> Self { + debug_assert!(frame_size.is_power_of_two()); + + let shift = frame_size.trailing_zeros(); + + debug_assert_eq!( + frame_size, + 2u32.pow(shift), + "computing the square root of a power of two is counting the trailing zeros" + ); + + Self { shift } + } +} + +impl FrameToIndex for AlignedFrame { + #[inline] + fn frame_to_index(&self, desc: UmemDescriptor) -> u64 { + desc.address >> self.shift + } +} + +pub struct UnalignedFrame { + frame_size: u64, +} + +impl UnalignedFrame { + pub fn new(frame_size: u32) -> Self { + let frame_size = frame_size as u64; + debug_assert!(!frame_size.is_power_of_two()); + Self { frame_size } + } +} + +impl FrameToIndex for UnalignedFrame { + #[inline] + fn frame_to_index(&self, desc: UmemDescriptor) -> u64 { + desc.address / self.frame_size + } +} + +/// Converts a frame index into a queue index +pub trait IndexToQueue: Unpin { + fn index_to_queue(&self, index: u64) -> u64; +} + +pub struct AlignedQueue { + mask: u64, +} + +impl AlignedQueue { + pub fn new(queues: usize) -> Self { + let queues = queues as u64; + debug_assert!(queues.is_power_of_two()); + let mask = queues - 1; + Self { mask } + } +} + +impl IndexToQueue for AlignedQueue { + #[inline] + fn index_to_queue(&self, index: u64) -> u64 { + index & self.mask + } +} + +pub struct UnalignedQueue { + queues: u64, +} + +impl UnalignedQueue { + pub fn new(queues: usize) -> Self { + let queues = queues as u64; + debug_assert!(!queues.is_power_of_two()); + Self { queues } + } +} + +impl IndexToQueue for UnalignedQueue { + #[inline] + fn index_to_queue(&self, index: u64) -> u64 { + index % self.queues + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::if_xdp::UmemDescriptor; + use bolero::check; + + #[cfg(not(kani))] + fn test_generic( + frame_size: u32, + frame: F, + queues: usize, + index: I, + ) { + let assigner = AssignGeneric { frame, index }; + + let indexes = 0u64..100; + let mut expected_queue = (0..queues as u64).cycle(); + + for desc in indexes.map(|idx| UmemDescriptor { + address: idx * frame_size as u64, + }) { + let expected_queue = expected_queue.next().unwrap(); + for queue in 0..queues as u64 { + let is_expected = queue == expected_queue; + + for offset in [0, 1, 2, (frame_size - 1) as _] { + let mut desc = desc; + desc.address += offset; + let was_assigned = assigner.assign(desc, queue as _); + assert_eq!( + is_expected, was_assigned, + "desc: {desc:?}, expected_queue: {expected_queue}, queue: {queue}" + ); + } + } + } + } + + #[cfg(kani)] + fn test_generic( + frame_size: u32, + frame: F, + queues: usize, + index: I, + ) { + let assigner = AssignGeneric { frame, index }; + + let address: u64 = kani::any(); + let expected_queue = (address / frame_size as u64) % queues as u64; + + let queue: u64 = kani::any(); + kani::assume(queue <= queues as u64); + + let desc = UmemDescriptor { address }; + + let is_expected = queue == expected_queue; + + let was_assigned = assigner.assign(desc, queue); + assert_eq!(is_expected, was_assigned,); + } + + #[test] + #[cfg_attr(kani, kani::proof, kani::unwind(4), kani::solver(kissat))] + fn assignment_test() { + // The kani proof takes about 1m with the current parameters. Increasing any of these + // numbers causes it to take much longer - but I didn't take the time to find out _how_ + // long. Either way, the current bounds should be sufficient to show that the math works. + let frames = if cfg!(kani) { 4u32..=6 } else { 4u32..=100_000 }; + let queues = if cfg!(kani) { 1usize..=4 } else { 1usize..=128 }; + + check!() + .with_generator((frames, queues)) + .cloned() + .for_each(|(frame_size, queues)| { + match (frame_size.is_power_of_two(), queues.is_power_of_two()) { + (true, true) => test_generic( + frame_size, + AlignedFrame::new(frame_size), + queues, + AlignedQueue::new(queues), + ), + (true, false) => test_generic( + frame_size, + AlignedFrame::new(frame_size), + queues, + UnalignedQueue::new(queues), + ), + (false, true) => test_generic( + frame_size, + UnalignedFrame::new(frame_size), + queues, + AlignedQueue::new(queues), + ), + (false, false) => test_generic( + frame_size, + UnalignedFrame::new(frame_size), + queues, + UnalignedQueue::new(queues), + ), + } + }); + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/task/rx.rs b/tools/xdp/s2n-quic-xdp/src/task/rx.rs new file mode 100644 index 0000000000..5da236516d --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/rx.rs @@ -0,0 +1,407 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{if_xdp::RxTxDescriptor, ring, socket, syscall}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use s2n_quic_core::sync::{spsc, worker}; + +/// Polls a RX queue for entries and sends them to the notifier +pub async fn rx(poller: P, rx: ring::Rx, notifier: N) { + Rx { + poller, + rx, + notifier, + } + .await; +} + +/// Polls a socket for pending RX items +pub trait Poller: Unpin { + fn poll Option>( + &mut self, + rx: &mut ring::Rx, + cx: &mut Context, + on_ready: F, + ) -> Poll>; +} + +/// Busy polls a socket +impl Poller for socket::Fd { + #[inline] + fn poll Option>( + &mut self, + rx: &mut ring::Rx, + cx: &mut Context, + mut on_ready: F, + ) -> Poll> { + let _ = syscall::busy_poll(self); + + // wake up the task immediately after + cx.waker().wake_by_ref(); + + // try to acquire entries from the RX queue + let count = rx.acquire(1); + + // we didn't get anything; try again later + if count == 0 { + return Poll::Pending; + } + + // notify the callback that we have some items + if on_ready(rx, cx).is_none() { + return Poll::Ready(Err(())); + } + + Poll::Ready(Ok(())) + } +} + +/// Polling implementation using a worker Receiver +/// +/// This is mostly used in testing. Real-world applications will likely use an actual socket. +impl Poller for worker::Receiver { + #[inline] + fn poll Option>( + &mut self, + rx: &mut ring::Rx, + cx: &mut Context, + mut on_ready: F, + ) -> Poll> { + // limit the number of loops to prevent endless spinning on registering wakers + for iteration in 0..10 { + trace!("iteration {}", iteration); + + // try to acquire work items + match self.poll_acquire(cx) { + Poll::Ready(Some(items)) => { + trace!("acquired {items} items from worker"); + + // try to acquire entries for the queue + let count = rx.acquire(items as _) as usize; + + trace!("acquired {count} items from RX ring"); + + // if we didn't get anything, try to acquire RX entries again + if count == 0 { + continue; + } + + // we have at least one entry so notify the callback + match on_ready(rx, cx) { + Some(actual) => { + trace!("consumed {actual} items"); + + self.finish(actual); + + continue; + } + None => { + trace!("on_ready closed; closing receiver"); + + return Poll::Ready(Err(())); + } + } + } + Poll::Ready(None) => { + trace!("worker sender closed; closing poller"); + + return Poll::Ready(Err(())); + } + Poll::Pending => { + trace!("worker out of items; sleeping"); + + return Poll::Pending; + } + } + } + + // if we got here, we iterated 10 times and need to yield so we don't consume the event + // loop too much + trace!("waking self"); + cx.waker().wake_by_ref(); + + Poll::Pending + } +} + +#[cfg(feature = "tokio")] +/// Polling implementation for an asynchronous socket +impl Poller for tokio::io::unix::AsyncFd { + #[inline] + fn poll Option>( + &mut self, + rx: &mut ring::Rx, + cx: &mut Context, + mut on_ready: F, + ) -> Poll> { + // limit the number of loops to prevent endless spinning on registering wakers + for iteration in 0..10 { + trace!("iteration {}", iteration); + + // query socket readiness through tokio's polling facilities + match self.poll_read_ready(cx) { + Poll::Ready(Ok(mut guard)) => { + // try to acquire entries for the queue + let count = rx.acquire(1) as usize; + + trace!("acquired {count} items from RX ring"); + + // if we didn't get anything, we need to clear readiness and try again + if count == 0 { + guard.clear_ready(); + trace!("clearing socket readiness and trying again"); + continue; + } + + // we have at least one entry so notify the callback + match on_ready(rx, cx) { + Some(actual) => { + trace!("consumed {actual} items"); + + // if we consumed all of the acquired items we'll need to poll the + // queue again for readiness so we can register a waker. + if actual >= count { + trace!("clearing socket readiness and trying again"); + guard.clear_ready(); + } + + continue; + } + None => { + trace!("on_ready closed; closing receiver"); + + return Poll::Ready(Err(())); + } + } + } + Poll::Ready(Err(err)) => { + trace!("socket returned an error while polling: {err:?}; closing poller"); + return Poll::Ready(Err(())); + } + Poll::Pending => { + trace!("ring out of items; sleeping"); + return Poll::Pending; + } + } + } + + // if we got here, we iterated 10 times and need to yield so we don't consume the event + // loop too much + trace!("waking self"); + cx.waker().wake_by_ref(); + + Poll::Pending + } +} + +/// Notifies an RX worker than new entries are available +pub trait Notifier: Unpin { + fn notify( + &mut self, + head: &mut [RxTxDescriptor], + tail: &mut [RxTxDescriptor], + cx: &mut Context, + ) -> Option; +} + +impl Notifier for spsc::Sender { + #[inline] + fn notify( + &mut self, + head: &mut [RxTxDescriptor], + tail: &mut [RxTxDescriptor], + cx: &mut Context, + ) -> Option { + trace!( + "notifying rx queue of {} available items", + head.len() + tail.len() + ); + + match self.poll_slice(cx) { + Poll::Ready(Ok(mut slice)) => { + trace!("rx queue has capacity of {}", slice.capacity()); + + let mut pushed = 0; + + /// copies the provided entries into the RX queue + macro_rules! extend { + ($name:ident) => { + if !$name.is_empty() { + let mut iter = $name + .iter() + .map(|v| { + pushed += 1; + *v + }) + .peekable(); + + while iter.peek().is_some() { + if slice.extend(&mut iter).is_err() { + trace!("rx queue closed; closing"); + return None; + } + } + } + }; + } + + extend!(head); + extend!(tail); + + trace!("rx queue pushed {pushed} items"); + + Some(pushed) + } + Poll::Ready(Err(_)) => { + trace!("rx queue closed; closing"); + None + } + Poll::Pending => { + trace!("no rx capacity available; sleeping"); + Some(0) + } + } + } +} + +struct Rx { + poller: P, + rx: ring::Rx, + notifier: N, +} + +impl Future for Rx { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + let Self { + poller, + rx, + notifier, + } = self.get_mut(); + + trace!("polling rx"); + + match poller.poll(rx, cx, |rx, cx| { + let (head, tail) = rx.data(); + let len = head.len() + tail.len(); + + let actual = notifier.notify(head, tail, cx)?; + + debug_assert!( + actual <= len, + "the number of actual items should not exceed what was acquired" + ); + + let len = len.min(actual); + + // release the entries back to the RX ring + rx.release(len as _); + + Some(len) + }) { + Poll::Ready(Ok(())) => Poll::Pending, + Poll::Ready(Err(_)) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + if_xdp::UmemDescriptor, + task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + }; + use rand::prelude::*; + use tokio::sync::oneshot; + + #[tokio::test] + async fn rx_test() { + let channel_size = QUEUE_SIZE; + let expected_total = TEST_ITEMS as u64; + + let (rx_send, mut rx_recv) = spsc::channel(channel_size); + let (ring_rx, mut ring_tx) = ring::testing::rx_tx(channel_size as u32); + let (worker_send, worker_recv) = worker::channel(); + let (done_send, done_recv) = oneshot::channel(); + + tokio::spawn(rx(worker_recv, ring_rx, rx_send)); + + tokio::spawn(async move { + let mut addresses = (0..expected_total) + .map(|address| UmemDescriptor { address }.with_len(0)) + .peekable(); + + let mut total = 0; + + while addresses.peek().is_some() { + let count = ring_tx.acquire(1); + + if count == 0 { + trace!("no capacity in TX ring; sleeping"); + random_delay().await; + continue; + } + + let batch_size = thread_rng().gen_range(1..=count); + let (head, tail) = ring_tx.data(); + + trace!("submitting {batch_size} items to TX ring"); + + let mut sent = 0; + for (desc, dest) in (&mut addresses) + .take(batch_size as _) + .zip(head.iter_mut().chain(tail)) + { + trace!("send entry address: {}", desc.address); + + *dest = desc; + sent += 1; + } + + ring_tx.release(sent as _); + worker_send.submit(sent as _); + total += sent; + + random_delay().await; + } + + assert_eq!(total, expected_total); + trace!("sender shutting down"); + }); + + tokio::spawn(async move { + let mut total = 0; + + while rx_recv.acquire().await.is_ok() { + let mut slice = rx_recv.slice(); + + trace!("waking up receiver with {} items", slice.len()); + + while let Some(desc) = slice.pop() { + trace!("recv entry address: {}", desc.address); + + assert_eq!( + desc.address, total, + "address does not match the expected value" + ); + total += 1; + } + } + + trace!("receiver shutting down"); + done_send.send(total).unwrap(); + }); + + let actual_total = done_recv.await.unwrap(); + + assert_eq!(expected_total, actual_total); + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs b/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs new file mode 100644 index 0000000000..ba1e6475ca --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs @@ -0,0 +1,298 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{if_xdp::UmemDescriptor, ring}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use s2n_quic_core::{ + slice::vectored_copy, + sync::{spsc, worker}, +}; + +type Receiver = spsc::Receiver; + +/// Takes descriptors from RX workers and forwards it on to the fill queue +pub async fn rx_to_fill(mut rx_queues: Vec, fill: ring::Fill, notify: N) { + match rx_queues.len() { + 0 => panic!("invalid rx queues"), + 1 => { + trace!("using single queue mode"); + RxToFillRing { + rxs: rx_queues.pop().unwrap(), + fill, + notify, + } + .await; + } + _ => { + trace!("using multi-queue mode with {} queues", rx_queues.len()); + RxToFillRing { + rxs: rx_queues, + fill, + notify, + } + .await; + } + } +} + +/// Notifies the implementor of emitted packets on the fill queue +pub trait Notifier: Unpin { + fn notify(&mut self, sent: u32, fill: &mut ring::Fill); +} + +impl Notifier for () { + #[inline] + fn notify(&mut self, _send: u32, _fill: &mut ring::Fill) { + // Nothing is usually needed here. The OS will pick up available entries on RX. + } +} + +impl Notifier for worker::Sender { + #[inline] + fn notify(&mut self, send: u32, _fill: &mut ring::Fill) { + self.submit(send as _); + } +} + +/// A group of RX queues that are responsible for processing packets +trait Rxs: Unpin { + /// Iterates over all of the queues in the group + fn for_each(&mut self, f: F); + /// Returns the number of queues + fn len(&self) -> usize; +} + +impl Rxs for Vec { + #[inline] + fn for_each(&mut self, mut f: F) { + for s in self.iter_mut() { + f(s); + } + } + + #[inline] + fn len(&self) -> usize { + Vec::len(self) + } +} + +impl Rxs for Receiver { + #[inline] + fn for_each(&mut self, mut f: F) { + f(self); + } + + #[inline] + fn len(&self) -> usize { + 1 + } +} + +struct RxToFillRing { + rxs: R, + fill: ring::Fill, + notify: N, +} + +impl Future for RxToFillRing { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + let Self { rxs, fill, notify } = self.get_mut(); + + trace!("polling rx to fill ring"); + + let mut sent = 0; + let mut closed = 0; + + let mut has_fill_capacity = true; + + rxs.for_each(|rx| { + // we need to loop until we can't read anything so the waker stays registered + while has_fill_capacity { + trace!("polling rx queue"); + + match rx.poll_slice(cx) { + Poll::Ready(Ok(mut slice)) => { + let (from_a, from_b) = slice.peek(); + let expected_len = (from_a.len() + from_b.len()) as u32; + debug_assert_ne!(expected_len, 0); + + trace!("rx queue has {} items available", expected_len); + + // acquire entries to submit to the fill queue + let actual_len = fill.acquire(expected_len); + + trace!("acquired {actual_len} items from the Fill queue"); + + let (to_a, to_b) = fill.data(); + + // copy all of the items from the worker's queue into the fill queue + let copied_len = vectored_copy(&[from_a, from_b], &mut [to_a, to_b]); + + trace!("moved {copied_len} items into the Fill queue"); + + // release all of the entries we copied + slice.release(copied_len); + fill.release(copied_len as _); + + sent += copied_len as u32; + + // the fill queue didn't have enough capacity for us to fill. make a last + // effort to acquire capacity or try again later. + if expected_len > actual_len { + if fill.acquire(u32::MAX) > 0 { + // we got something; keep filling it + continue; + } + + // we didn't get anything; yield and wake up immediately + cx.waker().wake_by_ref(); + has_fill_capacity = false; + break; + } + } + Poll::Ready(Err(_)) => { + trace!("rx queue closed"); + closed += 1; + break; + } + Poll::Pending => { + // we cleared the queue and registered our waker so go to the next queue + trace!("rx queue empty"); + break; + } + } + } + }); + + // submit the number of items that we sent to the fill queue + trace!("notifying that {sent} items were submitted to the fill queue"); + notify.notify(sent, fill); + + // if all of the queues are closed then shut down the task + if closed == rxs.len() { + trace!("all RX senders are closed; shutting down"); + return Poll::Ready(()); + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + if_xdp::UmemDescriptor, + task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + }; + use rand::prelude::*; + use tokio::sync::oneshot; + + async fn execute_test(workers: usize) { + let channel_size = QUEUE_SIZE; + let worker_total = TEST_ITEMS / workers; + let expected_total = worker_total * workers; + + let mut worker_channels = vec![]; + for idx in 0..workers { + let (mut tx_send, tx_recv) = spsc::channel(channel_size); + worker_channels.push(tx_recv); + + tokio::spawn(async move { + let mut addresses = (idx as u64..) + .step_by(workers) + .take(worker_total) + .map(|address| UmemDescriptor { address }) + .peekable(); + + while addresses.peek().is_some() { + if tx_send.acquire().await.is_err() { + trace!("TX receiver closed; shutting down"); + return; + } + + let mut slice = tx_send.slice(); + + let batch_size = thread_rng().gen_range(1..=slice.capacity()); + + trace!("TX batch size set to {batch_size}"); + + for desc in (&mut addresses).take(batch_size) { + trace!("sending address {}", desc.address); + let _ = slice.push(desc); + } + + random_delay().await; + } + + trace!("all items sent; shutting down"); + }); + } + + let (mut ring_rx, ring_tx) = ring::testing::completion_fill(channel_size as u32); + let (worker_send, mut worker_recv) = worker::channel(); + let (done_send, done_recv) = oneshot::channel(); + + tokio::spawn(rx_to_fill(worker_channels, ring_tx, worker_send)); + + tokio::spawn(async move { + let mut totals: Vec<_> = (0..workers as u64).collect(); + let mut total = 0; + + while let Some(credits) = worker_recv.acquire().await { + trace!("acquired {credits} worker credits"); + + let count = ring_rx.acquire(1); + + trace!("acquired {count} RX ring entries"); + + let count = credits.min(count as _); + + if count == 0 { + continue; + } + + let (head, tail) = ring_rx.data(); + for entry in head.iter().chain(tail.iter()).take(count) { + trace!("receiving address {}", entry.address); + + let worker = entry.address as usize % workers; + let worker_total = &mut totals[worker]; + assert_eq!(*worker_total, entry.address); + *worker_total += workers as u64; + } + + trace!("received {count} items"); + + ring_rx.release(count as _); + worker_recv.finish(count as _); + total += count as u64; + } + + trace!("receiver finished; shutting down"); + + done_send.send(total).unwrap(); + }); + + let actual_total = done_recv.await.unwrap(); + + assert_eq!(expected_total as u64, actual_total); + } + + #[tokio::test] + async fn single_worker() { + execute_test(1).await; + } + + #[tokio::test] + async fn multiple_worker() { + execute_test(4).await; + } +} diff --git a/tools/xdp/s2n-quic-xdp/src/task/testing.rs b/tools/xdp/s2n-quic-xdp/src/task/testing.rs new file mode 100644 index 0000000000..58336fa3f3 --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/testing.rs @@ -0,0 +1,24 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use core::time::Duration; +use rand::prelude::*; +use tokio::time; + +pub async fn random_delay() { + let delay = thread_rng().gen_range(0..100); + if delay > 0 { + let delay = Duration::from_micros(delay); + trace!("sleeping for {delay:?}"); + time::sleep(delay).await; + } +} + +/// The number of items to send through the test queues +pub const TEST_ITEMS: usize = 10_000; + +/// The configured size of each test queue. +/// +/// This value is purposefully low to more frequently trigger corner cases of +/// queues wrapping and/or getting full. +pub const QUEUE_SIZE: usize = 16; diff --git a/tools/xdp/s2n-quic-xdp/src/task/tx.rs b/tools/xdp/s2n-quic-xdp/src/task/tx.rs new file mode 100644 index 0000000000..82c35d0abc --- /dev/null +++ b/tools/xdp/s2n-quic-xdp/src/task/tx.rs @@ -0,0 +1,195 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{if_xdp::RxTxDescriptor, ring, socket, syscall}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use s2n_quic_core::{ + slice::vectored_copy, + sync::{spsc, worker}, +}; + +/// Takes a queue of descriptors to be transmitted on a socket +pub async fn tx( + outgoing: spsc::Receiver, + tx: ring::Tx, + notifier: N, + worker: worker::Sender, +) { + Tx { + outgoing, + tx, + notifier, + worker, + } + .await; +} + +/// Notifies the implementor of progress on the TX ring +pub trait Notifier: Unpin { + fn notify(&mut self); +} + +impl Notifier for () { + #[inline] + fn notify(&mut self) { + // nothing to do + } +} + +impl Notifier for socket::Fd { + #[inline] + fn notify(&mut self) { + let result = syscall::wake_tx(self); + + trace!("waking tx for progress {result:?}"); + } +} + +struct Tx { + outgoing: spsc::Receiver, + tx: ring::Tx, + notifier: N, + worker: worker::Sender, +} + +impl Future for Tx { + type Output = (); + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + let Self { + outgoing, + tx, + notifier, + worker, + } = self.get_mut(); + + trace!("polling tx"); + + for iteration in 0..10 { + trace!("iteration {}", iteration); + + let count = match outgoing.poll_slice(cx) { + Poll::Ready(Ok(slice)) => slice.len() as u32, + Poll::Ready(Err(_)) => { + trace!("tx queue is closed; shutting down"); + return Poll::Ready(()); + } + Poll::Pending => { + trace!("tx queue out of items; sleeping"); + return Poll::Pending; + } + }; + + trace!("acquired {count} items from tx queues"); + + let count = tx.acquire(count); + + trace!("acquired {count} items from TX ring"); + + if count == 0 { + notifier.notify(); + continue; + } + + let mut outgoing = outgoing.slice(); + let (rx_head, rx_tail) = outgoing.peek(); + let (tx_head, tx_tail) = tx.data(); + + let count = vectored_copy(&[rx_head, rx_tail], &mut [tx_head, tx_tail]); + + trace!("copied {count} items into TX ring"); + + if count > 0 { + tx.release(count as _); + outgoing.release(count); + worker.submit(count); + } + + if tx.needs_wakeup() { + trace!("TX ring needs wakeup"); + notifier.notify(); + } + } + + // if we got here, we iterated 10 times and need to yield so we don't consume the event + // loop too much + trace!("waking self"); + cx.waker().wake_by_ref(); + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + if_xdp::UmemDescriptor, + task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + }; + use rand::prelude::*; + use tokio::sync::oneshot; + + #[tokio::test] + async fn tx_test() { + let channel_size = QUEUE_SIZE; + let expected_total = TEST_ITEMS as u64; + + let (mut tx_send, tx_recv) = spsc::channel(channel_size); + let (mut ring_rx, ring_tx) = ring::testing::rx_tx(channel_size as u32); + let (worker_send, mut worker_recv) = worker::channel(); + let (done_send, done_recv) = oneshot::channel(); + + tokio::spawn(tx(tx_recv, ring_tx, (), worker_send)); + + tokio::spawn(async move { + let mut addresses = (0..expected_total) + .map(|address| UmemDescriptor { address }.with_len(0)) + .peekable(); + + while addresses.peek().is_some() { + if tx_send.acquire().await.is_err() { + return; + } + + let batch_size = thread_rng().gen_range(1..channel_size); + let mut slice = tx_send.slice(); + + let _ = slice.extend(&mut (&mut addresses).take(batch_size)); + + random_delay().await; + } + }); + + tokio::spawn(async move { + let mut total = 0; + + while let Some(credits) = worker_recv.acquire().await { + let actual = ring_rx.acquire(1); + + if actual == 0 { + continue; + } + + let (head, tail) = ring_rx.data(); + for entry in head.iter().chain(tail.iter()) { + assert_eq!(entry.address, total); + total += 1; + } + + ring_rx.release(actual); + worker_recv.finish(credits); + } + + done_send.send(total).unwrap(); + }); + + let actual_total = done_recv.await.unwrap(); + + assert_eq!(expected_total, actual_total); + } +} From 75a1e3527887b622bb4c1e5cd7e249ec53f6137f Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Thu, 27 Apr 2023 12:08:35 -0600 Subject: [PATCH 2/5] fix cursor wrapping --- tools/xdp/s2n-quic-xdp/Cargo.toml | 1 + tools/xdp/s2n-quic-xdp/src/ring/cursor.rs | 138 ++++++++++++++-------- 2 files changed, 92 insertions(+), 47 deletions(-) diff --git a/tools/xdp/s2n-quic-xdp/Cargo.toml b/tools/xdp/s2n-quic-xdp/Cargo.toml index 3fd0e06978..037aad2c1a 100644 --- a/tools/xdp/s2n-quic-xdp/Cargo.toml +++ b/tools/xdp/s2n-quic-xdp/Cargo.toml @@ -16,4 +16,5 @@ tokio = { version = "1", optional = true } [dev-dependencies] bolero = "0.9" rand = "0.8" +s2n-quic-core = { path = "../../../quic/s2n-quic-core", version = "0.19", features = ["testing"] } tokio = { version = "1", features = ["full"] } diff --git a/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs b/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs index aa2feb819f..e48843c5d2 100644 --- a/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs +++ b/tools/xdp/s2n-quic-xdp/src/ring/cursor.rs @@ -108,7 +108,9 @@ impl Cursor { // See // https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L99-L104 self.cached_consumer += self.size; - self.cached_len = self.cached_producer_len() + self.cached_len = self.cached_producer_len(); + + debug_assert!(self.cached_len <= self.size); } /// Returns a reference to the producer atomic cursor @@ -148,7 +150,7 @@ impl Cursor { // Our cached copy has the size added so we also need to add the size here when comparing // // See `Self::init_producer` for more details - new_value += self.size; + new_value = new_value.wrapping_add(self.size); if self.cached_consumer.0 == new_value { return free; @@ -158,6 +160,8 @@ impl Cursor { self.cached_len = self.cached_producer_len(); + debug_assert!(self.cached_len <= self.size); + self.cached_len } @@ -199,6 +203,9 @@ impl Cursor { } self.cached_producer += len; self.cached_len -= len; + + debug_assert!(self.cached_len <= self.size); + self.producer().fetch_add(len, Ordering::Release); } @@ -226,6 +233,8 @@ impl Cursor { self.cached_len = self.cached_consumer_len(); + debug_assert!(self.cached_len <= self.size); + self.cached_len } @@ -267,6 +276,9 @@ impl Cursor { } self.cached_consumer += len; self.cached_len -= len; + + debug_assert!(self.cached_len <= self.size); + self.consumer().fetch_add(len, Ordering::Release); } @@ -420,26 +432,28 @@ mod tests { } } - fn model(power_of_two: u8, ops: &[Op]) { - let size = 1 << power_of_two; + fn stack_cursors(init_cursor: u32, desc: &mut [T], exec: F) -> R + where + T: fmt::Debug + Copy, + F: FnOnce(&mut Cursor, &mut Cursor) -> R, + { + let size = desc.len() as u32; + debug_assert!(size.is_power_of_two()); let mask = size - 1; - let producer_v = UnsafeCell::new(AtomicU32::new(0)); - let consumer_v = UnsafeCell::new(AtomicU32::new(0)); - let desc = UnsafeCell::new(vec![u32::MAX; size as usize]); + let producer_v = UnsafeCell::new(AtomicU32::new(init_cursor)); + let consumer_v = UnsafeCell::new(AtomicU32::new(init_cursor)); + let desc = UnsafeCell::new(desc); let producer_v = producer_v.get(); let consumer_v = consumer_v.get(); - let desc = unsafe { (&mut *desc.get()).as_mut_ptr() as *mut _ }; + let desc = unsafe { (*desc.get()).as_mut_ptr() as *mut _ }; - let mut oracle = Oracle { - size, - producer: size, - ..Default::default() - }; + let cached_consumer = Wrapping(init_cursor); + let cached_producer = Wrapping(init_cursor); - let mut producer: Cursor = Cursor { - cached_consumer: Wrapping(0), - cached_producer: Wrapping(0), + let mut producer: Cursor = Cursor { + cached_consumer, + cached_producer, cached_len: 0, size, producer: NonNull::new(producer_v).unwrap(), @@ -454,9 +468,11 @@ mod tests { producer.init_producer(); } - let mut consumer: Cursor = Cursor { - cached_consumer: Wrapping(0), - cached_producer: Wrapping(0), + assert_eq!(producer.acquire_producer(u32::MAX), size); + + let mut consumer: Cursor = Cursor { + cached_consumer, + cached_producer, cached_len: 0, size, producer: NonNull::new(producer_v).unwrap(), @@ -469,42 +485,70 @@ mod tests { assert_eq!(consumer.acquire_consumer(u32::MAX), 0); - for op in ops.iter().copied() { - oracle.fill_producer(unsafe { producer.producer_data() }); + exec(&mut producer, &mut consumer) + } - match op { - Op::ConsumerAcquire(count) => { - let actual = consumer.acquire_consumer(count as _); - oracle.acquire_consumer(actual); - } - Op::ConsumerRelease(count) => { - let oracle_count = oracle.release_consumer(count); - consumer.release_consumer(oracle_count); - } - Op::ProducerAcquire(count) => { - let actual = producer.acquire_producer(count as _); - oracle.acquire_producer(actual); + fn model(power_of_two: u8, init_cursor: u32, ops: &[Op]) { + let size = (1 << power_of_two) as u32; + + #[cfg(not(kani))] + let mut desc = vec![u32::MAX; size as usize]; + + #[cfg(kani)] + let mut desc = &mut [u32::MAX; (1 << MAX_POWER_OF_TWO) as usize][..size as usize]; + + stack_cursors(init_cursor, &mut desc, |producer, consumer| { + let mut oracle = Oracle { + size, + producer: size, + ..Default::default() + }; + + for op in ops.iter().copied() { + oracle.fill_producer(unsafe { producer.producer_data() }); + + match op { + Op::ConsumerAcquire(count) => { + let actual = consumer.acquire_consumer(count as _); + oracle.acquire_consumer(actual); + } + Op::ConsumerRelease(count) => { + let oracle_count = oracle.release_consumer(count); + consumer.release_consumer(oracle_count); + } + Op::ProducerAcquire(count) => { + let actual = producer.acquire_producer(count as _); + oracle.acquire_producer(actual); + } + Op::ProducerRelease(count) => { + let oracle_count = oracle.release_producer(count); + producer.release_producer(oracle_count); + } } - Op::ProducerRelease(count) => { - let oracle_count = oracle.release_producer(count); - producer.release_producer(oracle_count); - } - } - oracle.validate_consumer(unsafe { consumer.consumer_data() }); - } + oracle.validate_consumer(unsafe { consumer.consumer_data() }); + } - // final assertions - let actual = consumer.acquire_consumer(u32::MAX); - oracle.acquire_consumer(actual); - let data = unsafe { consumer.consumer_data() }; - oracle.validate_consumer(data); + // final assertions + let actual = consumer.acquire_consumer(u32::MAX); + oracle.acquire_consumer(actual); + let data = unsafe { consumer.consumer_data() }; + oracle.validate_consumer(data); + }); } + #[cfg(not(kani))] + type Ops = Vec; + #[cfg(kani)] + type Ops = s2n_quic_core::testing::InlineVec; + + const MAX_POWER_OF_TWO: u8 = if cfg!(kani) { 2 } else { 10 }; + #[test] + #[cfg_attr(kani, kani::proof, kani::unwind(5), kani::solver(kissat))] fn oracle_test() { check!() - .with_generator((1..=10, gen::>())) - .for_each(|(power_of_two, ops)| model(*power_of_two, ops)); + .with_generator((1..=MAX_POWER_OF_TWO, gen(), gen::())) + .for_each(|(power_of_two, init_cursor, ops)| model(*power_of_two, *init_cursor, ops)); } } From 0ad5d21396f50296ed143f730a989a77fd7898e5 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Thu, 27 Apr 2023 17:03:43 -0600 Subject: [PATCH 3/5] add tests for large queue sizes --- .github/workflows/ci.yml | 5 ++- .../s2n-quic-xdp/src/task/completion_to_tx.rs | 32 +++++++++++++------ tools/xdp/s2n-quic-xdp/src/task/rx.rs | 16 +++++++--- tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs | 23 +++++++++---- tools/xdp/s2n-quic-xdp/src/task/testing.rs | 5 ++- tools/xdp/s2n-quic-xdp/src/task/tx.rs | 16 +++++++--- 6 files changed, 71 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 56f4f10e71..7f76a78ce6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -666,6 +666,9 @@ jobs: kani: runs-on: ubuntu-latest + strategy: + matrix: + crate: [quic/s2n-quic-core, tools/xdp/s2n-quic-xdp] steps: - uses: actions/checkout@v3 with: @@ -674,7 +677,7 @@ jobs: - name: Kani run uses: model-checking/kani-github-action@v0.26 with: - working-directory: quic/s2n-quic-core + working-directory: ${{ matrix.crate }} args: --tests dhat: diff --git a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs index 8431bd0b75..aa54f1c9f3 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs @@ -325,13 +325,12 @@ mod tests { use super::*; use crate::{ if_xdp::UmemDescriptor, - task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + task::testing::{random_delay, QUEUE_SIZE_LARGE, QUEUE_SIZE_SMALL, TEST_ITEMS}, }; use rand::prelude::*; use tokio::sync::oneshot; - async fn execute_test(workers: usize, frame_size: u32) { - let channel_size = QUEUE_SIZE; + async fn execute_test(workers: usize, frame_size: u32, channel_size: usize) { let worker_total = TEST_ITEMS / workers; let expected_total = worker_total * workers; @@ -440,17 +439,32 @@ mod tests { } #[tokio::test] - async fn single_worker() { - execute_test(1, 4096).await; + async fn single_worker_small_test() { + execute_test(1, 4096, QUEUE_SIZE_SMALL).await; } #[tokio::test] - async fn multiple_worker_aligned() { - execute_test(4, 16).await; + async fn single_worker_large_test() { + execute_test(1, 4096, QUEUE_SIZE_LARGE).await; } #[tokio::test] - async fn multiple_worker_unaligned() { - execute_test(4, 17).await; + async fn multiple_worker_aligned_small_test() { + execute_test(4, 16, QUEUE_SIZE_SMALL).await; + } + + #[tokio::test] + async fn multiple_worker_aligned_large_test() { + execute_test(4, 16, QUEUE_SIZE_LARGE).await; + } + + #[tokio::test] + async fn multiple_worker_unaligned_small_test() { + execute_test(4, 17, QUEUE_SIZE_SMALL).await; + } + + #[tokio::test] + async fn multiple_worker_unaligned_large_test() { + execute_test(4, 17, QUEUE_SIZE_LARGE).await; } } diff --git a/tools/xdp/s2n-quic-xdp/src/task/rx.rs b/tools/xdp/s2n-quic-xdp/src/task/rx.rs index 5da236516d..fe6d616d88 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/rx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/rx.rs @@ -317,14 +317,12 @@ mod tests { use super::*; use crate::{ if_xdp::UmemDescriptor, - task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + task::testing::{random_delay, QUEUE_SIZE_LARGE, QUEUE_SIZE_SMALL, TEST_ITEMS}, }; use rand::prelude::*; use tokio::sync::oneshot; - #[tokio::test] - async fn rx_test() { - let channel_size = QUEUE_SIZE; + async fn execute_test(channel_size: usize) { let expected_total = TEST_ITEMS as u64; let (rx_send, mut rx_recv) = spsc::channel(channel_size); @@ -404,4 +402,14 @@ mod tests { assert_eq!(expected_total, actual_total); } + + #[tokio::test] + async fn rx_small_test() { + execute_test(QUEUE_SIZE_SMALL).await; + } + + #[tokio::test] + async fn rx_large_test() { + execute_test(QUEUE_SIZE_LARGE).await; + } } diff --git a/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs b/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs index ba1e6475ca..727d85bd47 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/rx_to_fill.rs @@ -190,13 +190,12 @@ mod tests { use super::*; use crate::{ if_xdp::UmemDescriptor, - task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + task::testing::{random_delay, QUEUE_SIZE_LARGE, QUEUE_SIZE_SMALL, TEST_ITEMS}, }; use rand::prelude::*; use tokio::sync::oneshot; - async fn execute_test(workers: usize) { - let channel_size = QUEUE_SIZE; + async fn execute_test(workers: usize, channel_size: usize) { let worker_total = TEST_ITEMS / workers; let expected_total = worker_total * workers; @@ -287,12 +286,22 @@ mod tests { } #[tokio::test] - async fn single_worker() { - execute_test(1).await; + async fn single_worker_small_test() { + execute_test(1, QUEUE_SIZE_SMALL).await; } #[tokio::test] - async fn multiple_worker() { - execute_test(4).await; + async fn single_worker_large_test() { + execute_test(1, QUEUE_SIZE_LARGE).await; + } + + #[tokio::test] + async fn multiple_worker_small_test() { + execute_test(4, QUEUE_SIZE_SMALL).await; + } + + #[tokio::test] + async fn multiple_worker_large_test() { + execute_test(4, QUEUE_SIZE_LARGE).await; } } diff --git a/tools/xdp/s2n-quic-xdp/src/task/testing.rs b/tools/xdp/s2n-quic-xdp/src/task/testing.rs index 58336fa3f3..5143340e4f 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/testing.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/testing.rs @@ -21,4 +21,7 @@ pub const TEST_ITEMS: usize = 10_000; /// /// This value is purposefully low to more frequently trigger corner cases of /// queues wrapping and/or getting full. -pub const QUEUE_SIZE: usize = 16; +pub const QUEUE_SIZE_SMALL: usize = 16; + +/// Production queues are unlikely to be larger than this value +pub const QUEUE_SIZE_LARGE: usize = 4096; diff --git a/tools/xdp/s2n-quic-xdp/src/task/tx.rs b/tools/xdp/s2n-quic-xdp/src/task/tx.rs index 82c35d0abc..9348849a20 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/tx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/tx.rs @@ -129,14 +129,12 @@ mod tests { use super::*; use crate::{ if_xdp::UmemDescriptor, - task::testing::{random_delay, QUEUE_SIZE, TEST_ITEMS}, + task::testing::{random_delay, QUEUE_SIZE_LARGE, QUEUE_SIZE_SMALL, TEST_ITEMS}, }; use rand::prelude::*; use tokio::sync::oneshot; - #[tokio::test] - async fn tx_test() { - let channel_size = QUEUE_SIZE; + async fn execute_test(channel_size: usize) { let expected_total = TEST_ITEMS as u64; let (mut tx_send, tx_recv) = spsc::channel(channel_size); @@ -192,4 +190,14 @@ mod tests { assert_eq!(expected_total, actual_total); } + + #[tokio::test] + async fn tx_small_test() { + execute_test(QUEUE_SIZE_SMALL).await; + } + + #[tokio::test] + async fn tx_large_test() { + execute_test(QUEUE_SIZE_LARGE).await; + } } From 919e7cecfb8a657b39a1f60dd96ef8566cb7e60e Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Mon, 1 May 2023 21:06:30 -0600 Subject: [PATCH 4/5] more feedback --- tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs | 6 ++++-- .../src/task/completion_to_tx/assign.rs | 13 +++++++------ tools/xdp/s2n-quic-xdp/src/task/rx.rs | 2 ++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs index aa54f1c9f3..66abdf037e 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs @@ -33,7 +33,7 @@ pub async fn completion_to_tx( frame_size.is_power_of_two(), tx_queues.len().is_power_of_two(), ) { - (0, _, _) => panic!("invalid tx_queues size"), + (0, _, _) => panic!("invalid must be non-zero length"), (1, _, _) => { trace!("using single queue mode"); CompletionRingToTx { @@ -101,7 +101,9 @@ pub async fn completion_to_tx( /// Polls the completion queue for progress pub trait Poller: Unpin { + /// Polls the completion queue for progress fn poll(&mut self, comp: &mut ring::Completion, cx: &mut Context) -> Poll>; + /// Releases `count` number of entries fn release(&mut self, comp: &mut ring::Completion, count: usize); } @@ -271,7 +273,7 @@ impl Future for CompletionRingToTx bool; + fn is_assigned(&self, desc: UmemDescriptor, idx: u64) -> bool; } impl Assign for () { #[inline] - fn assign(&self, _desc: UmemDescriptor, idx: u64) -> bool { + fn is_assigned(&self, _desc: UmemDescriptor, idx: u64) -> bool { debug_assert_eq!( idx, 0, "assignment mode should only be used for single queue workflows" @@ -33,7 +33,7 @@ pub struct AssignGeneric { impl Assign for AssignGeneric { #[inline] - fn assign(&self, desc: UmemDescriptor, idx: u64) -> bool { + fn is_assigned(&self, desc: UmemDescriptor, idx: u64) -> bool { let v = self.frame.frame_to_index(desc); let v = self.index.index_to_queue(v); v == idx @@ -51,7 +51,8 @@ pub struct AlignedFrame { impl AlignedFrame { pub fn new(frame_size: u32) -> Self { - debug_assert!(frame_size.is_power_of_two()); + assert!(frame_size.is_power_of_two()); + assert!(frame_size > 2, "cannot take the square root of 2"); let shift = frame_size.trailing_zeros(); @@ -163,7 +164,7 @@ mod tests { for offset in [0, 1, 2, (frame_size - 1) as _] { let mut desc = desc; desc.address += offset; - let was_assigned = assigner.assign(desc, queue as _); + let was_assigned = assigner.is_assigned(desc, queue as _); assert_eq!( is_expected, was_assigned, "desc: {desc:?}, expected_queue: {expected_queue}, queue: {queue}" @@ -192,7 +193,7 @@ mod tests { let is_expected = queue == expected_queue; - let was_assigned = assigner.assign(desc, queue); + let was_assigned = assigner.is_assigned(desc, queue); assert_eq!(is_expected, was_assigned,); } diff --git a/tools/xdp/s2n-quic-xdp/src/task/rx.rs b/tools/xdp/s2n-quic-xdp/src/task/rx.rs index fe6d616d88..63adbfafa0 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/rx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/rx.rs @@ -298,6 +298,8 @@ impl Future for Rx { "the number of actual items should not exceed what was acquired" ); + // While we have a `debug_assert` above, this is being overly defensive just in case. + // In regular conditions, it's equivalent to just releasing `actual`. let len = len.min(actual); // release the entries back to the RX ring From 8721eebb52c5e7d313c970f855e139452d73d8eb Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Tue, 2 May 2023 10:23:47 -0600 Subject: [PATCH 5/5] Update tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs Co-authored-by: Wesley Rosenblum <55108558+WesleyRosenblum@users.noreply.github.com> --- tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs index 66abdf037e..72f3f0df31 100644 --- a/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs +++ b/tools/xdp/s2n-quic-xdp/src/task/completion_to_tx.rs @@ -33,7 +33,7 @@ pub async fn completion_to_tx( frame_size.is_power_of_two(), tx_queues.len().is_power_of_two(), ) { - (0, _, _) => panic!("invalid must be non-zero length"), + (0, _, _) => panic!("tx_queues must be non-zero length"), (1, _, _) => { trace!("using single queue mode"); CompletionRingToTx {