Skip to content

Commit

Permalink
feat(s2n-quic-xdp): implement tx/rx traits for shared tokio sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed May 11, 2023
1 parent 89ea847 commit a34fa7d
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 90 deletions.
73 changes: 3 additions & 70 deletions tools/xdp/s2n-quic-xdp/src/task/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub async fn rx<P: Poller, N: Notifier>(poller: P, rx: ring::Rx, notifier: N) {
.await;
}

#[cfg(feature = "tokio")]
mod tokio_impl;

/// Polls a socket for pending RX items
pub trait Poller: Unpin {
fn poll<F: FnMut(&mut ring::Rx, &mut Context) -> Option<usize>>(
Expand Down Expand Up @@ -128,76 +131,6 @@ impl Poller for worker::Receiver {
}
}

#[cfg(feature = "tokio")]
/// Polling implementation for an asynchronous socket
impl Poller for tokio::io::unix::AsyncFd<socket::Fd> {
#[inline]
fn poll<F: FnMut(&mut ring::Rx, &mut Context) -> Option<usize>>(
&mut self,
rx: &mut ring::Rx,
cx: &mut Context,
mut on_ready: F,
) -> Poll<Result<(), ()>> {
// limit the number of loops to prevent endless spinning on registering wakers
for iteration in 0..10 {
trace!("iteration {}", iteration);

// query socket readiness through tokio's polling facilities
match self.poll_read_ready(cx) {
Poll::Ready(Ok(mut guard)) => {
// try to acquire entries for the queue
let count = rx.acquire(1) as usize;

trace!("acquired {count} items from RX ring");

// if we didn't get anything, we need to clear readiness and try again
if count == 0 {
guard.clear_ready();
trace!("clearing socket readiness and trying again");
continue;
}

// we have at least one entry so notify the callback
match on_ready(rx, cx) {
Some(actual) => {
trace!("consumed {actual} items");

// if we consumed all of the acquired items we'll need to poll the
// queue again for readiness so we can register a waker.
if actual >= count {
trace!("clearing socket readiness and trying again");
guard.clear_ready();
}

continue;
}
None => {
trace!("on_ready closed; closing receiver");

return Poll::Ready(Err(()));
}
}
}
Poll::Ready(Err(err)) => {
trace!("socket returned an error while polling: {err:?}; closing poller");
return Poll::Ready(Err(()));
}
Poll::Pending => {
trace!("ring out of items; sleeping");
return Poll::Pending;
}
}
}

// if we got here, we iterated 10 times and need to yield so we don't consume the event
// loop too much
trace!("waking self");
cx.waker().wake_by_ref();

Poll::Pending
}
}

/// Notifies an RX worker than new entries are available
pub trait Notifier: Unpin {
fn notify(
Expand Down
99 changes: 99 additions & 0 deletions tools/xdp/s2n-quic-xdp/src/task/rx/tokio_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

type Fd = tokio::io::unix::AsyncFd<socket::Fd>;

/// Polls read readiness for a tokio socket
#[inline]
fn poll<F: FnMut(&mut ring::Rx, &mut Context) -> Option<usize>>(
fd: &Fd,
rx: &mut ring::Rx,
cx: &mut Context,
mut on_ready: F,
) -> Poll<Result<(), ()>> {
// limit the number of loops to prevent endless spinning on registering wakers
for iteration in 0..10 {
trace!("iteration {}", iteration);

// query socket readiness through tokio's polling facilities
match fd.poll_read_ready(cx) {
Poll::Ready(Ok(mut guard)) => {
// try to acquire entries for the queue
let count = rx.acquire(1) as usize;

trace!("acquired {count} items from RX ring");

// if we didn't get anything, we need to clear readiness and try again
if count == 0 {
guard.clear_ready();
trace!("clearing socket readiness and trying again");
continue;
}

// we have at least one entry so notify the callback
match on_ready(rx, cx) {
Some(actual) => {
trace!("consumed {actual} items");

// if we consumed all of the acquired items we'll need to poll the
// queue again for readiness so we can register a waker.
if actual >= count {
trace!("clearing socket readiness and trying again");
guard.clear_ready();
}

continue;
}
None => {
trace!("on_ready closed; closing receiver");

return Poll::Ready(Err(()));
}
}
}
Poll::Ready(Err(err)) => {
trace!("socket returned an error while polling: {err:?}; closing poller");
return Poll::Ready(Err(()));
}
Poll::Pending => {
trace!("ring out of items; sleeping");
return Poll::Pending;
}
}
}

// if we got here, we iterated 10 times and need to yield so we don't consume the event
// loop too much
trace!("waking self");
cx.waker().wake_by_ref();

Poll::Pending
}

/// Polling implementation for an asynchronous socket
impl Poller for Fd {
#[inline]
fn poll<F: FnMut(&mut ring::Rx, &mut Context) -> Option<usize>>(
&mut self,
rx: &mut ring::Rx,
cx: &mut Context,
on_ready: F,
) -> Poll<Result<(), ()>> {
poll(self, rx, cx, on_ready)
}
}

/// Polling implementation for a shared asynchronous socket
impl Poller for std::sync::Arc<Fd> {
#[inline]
fn poll<F: FnMut(&mut ring::Rx, &mut Context) -> Option<usize>>(
&mut self,
rx: &mut ring::Rx,
cx: &mut Context,
on_ready: F,
) -> Poll<Result<(), ()>> {
poll(self, rx, cx, on_ready)
}
}
78 changes: 58 additions & 20 deletions tools/xdp/s2n-quic-xdp/src/task/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,85 @@ pub async fn tx<N: Notifier>(outgoing: spsc::Receiver<RxTxDescriptor>, tx: ring:
.await;
}

#[cfg(feature = "tokio")]
mod tokio_impl;

/// Notifies the implementor of progress on the TX ring
pub trait Notifier: Unpin {
fn notify(&mut self, tx: &mut ring::Tx, count: u32);
/// Notifies the subject that `count` items were transmitted on the TX ring
fn notify(&mut self, tx: &mut ring::Tx, cx: &mut Context, count: u32);
/// Notifies the subject that the TX ring doesn't have any capacity for transmission
fn notify_empty(&mut self, tx: &mut ring::Tx, cx: &mut Context) -> Poll<()>;
}

impl Notifier for () {
#[inline]
fn notify(&mut self, _tx: &mut ring::Tx, _count: u32) {
fn notify(&mut self, _tx: &mut ring::Tx, _cx: &mut Context, _count: u32) {
// nothing to do
}

#[inline]
fn notify_empty(&mut self, _tx: &mut ring::Tx, _cx: &mut Context) -> Poll<()> {
// nothing to do
Poll::Ready(())
}
}

impl<A: Notifier, B: Notifier> Notifier for (A, B) {
#[inline]
fn notify(&mut self, tx: &mut ring::Tx, count: u32) {
self.0.notify(tx, count);
self.1.notify(tx, count);
fn notify(&mut self, tx: &mut ring::Tx, cx: &mut Context, count: u32) {
self.0.notify(tx, cx, count);
self.1.notify(tx, cx, count);
}

#[inline]
fn notify_empty(&mut self, tx: &mut ring::Tx, cx: &mut Context) -> Poll<()> {
let a = self.0.notify_empty(tx, cx);
let b = self.1.notify_empty(tx, cx);
if a.is_ready() && b.is_ready() {
a
} else {
Poll::Pending
}
}
}

impl Notifier for worker::Sender {
#[inline]
fn notify(&mut self, _tx: &mut ring::Tx, count: u32) {
if count > 0 {
trace!("notifying worker to wake up with {count} entries");
self.submit(count as _);
}
fn notify(&mut self, _tx: &mut ring::Tx, _cx: &mut Context, count: u32) {
trace!("notifying worker to wake up with {count} entries");
self.submit(count as _);
}

#[inline]
fn notify_empty(&mut self, tx: &mut ring::Tx, _cx: &mut Context) -> Poll<()> {
// there is no feedback mechanism for the worker::Sender so do nothing
let _ = tx;
Poll::Ready(())
}
}

impl Notifier for socket::Fd {
#[inline]
fn notify(&mut self, tx: &mut ring::Tx, _count: u32) {
fn notify(&mut self, tx: &mut ring::Tx, cx: &mut Context, _count: u32) {
// notify the socket to ensure progress regardless of transmission count
let _ = self.notify_empty(tx, cx);
}

#[inline]
fn notify_empty(&mut self, tx: &mut ring::Tx, _cx: &mut Context) -> Poll<()> {
// only notify the socket if it's set the needs wakeup flag
if !tx.needs_wakeup() {
trace!("TX ring doesn't need wake, returning early");
return;
return Poll::Ready(());
}

trace!("TX ring needs wakeup");
let result = syscall::wake_tx(self);

trace!("waking tx for progress {result:?}");

Poll::Ready(())
}
}

Expand Down Expand Up @@ -109,8 +145,12 @@ impl<N: Notifier> Future for Tx<N> {
trace!("acquired {count} items from TX ring");

if count == 0 {
notifier.notify(tx, count);
continue;
// we couldn't acquire any items so notify the socket that we don't have capacity
if notifier.notify_empty(tx, cx).is_ready() {
continue;
} else {
return Poll::Pending;
}
}

let mut outgoing = outgoing.slice();
Expand All @@ -120,13 +160,11 @@ impl<N: Notifier> Future for Tx<N> {
let count = vectored_copy(&[rx_head, rx_tail], &mut [tx_head, tx_tail]);

trace!("copied {count} items into TX ring");
debug_assert_ne!(count, 0);

if count > 0 {
tx.release(count as _);
outgoing.release(count);
}

notifier.notify(tx, count as _);
tx.release(count as _);
outgoing.release(count);
notifier.notify(tx, cx, count as _);
}

// if we got here, we iterated 10 times and need to yield so we don't consume the event
Expand Down
Loading

0 comments on commit a34fa7d

Please sign in to comment.