diff --git a/quic/s2n-quic-core/Cargo.toml b/quic/s2n-quic-core/Cargo.toml index 22e656ae1d..0096506ce8 100644 --- a/quic/s2n-quic-core/Cargo.toml +++ b/quic/s2n-quic-core/Cargo.toml @@ -30,6 +30,7 @@ hex-literal = "0.4" insta = { version = ">=1.12", features = ["json"], optional = true } num-rational = { version = "0.4", default-features = false } num-traits = { version = "0.2", default-features = false, features = ["libm"] } +pin-project-lite = { version = "0.2" } s2n-codec = { version = "=0.5.0", path = "../../common/s2n-codec", default-features = false } subtle = { version = "2", default-features = false } tracing = { version = "0.1", default-features = false, optional = true } diff --git a/quic/s2n-quic-core/src/io/event_loop.rs b/quic/s2n-quic-core/src/io/event_loop.rs new file mode 100644 index 0000000000..86c57f0b40 --- /dev/null +++ b/quic/s2n-quic-core/src/io/event_loop.rs @@ -0,0 +1,164 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + endpoint::Endpoint, + event::{self, EndpointPublisher}, + io::{rx::Rx, tx::Tx}, + time::clock::{ClockWithTimer, Timer}, +}; +use core::pin::Pin; + +pub mod select; +use select::Select; + +pub struct EventLoop { + pub endpoint: E, + pub clock: C, + pub rx: R, + pub tx: T, +} + +impl EventLoop +where + E: Endpoint, + C: ClockWithTimer, + R: Rx, + T: Tx, +{ + /// Starts running the endpoint event loop in an async task + pub async fn start(self) { + let Self { + mut endpoint, + clock, + mut rx, + mut tx, + } = self; + + /// Creates a event publisher with the endpoint's subscriber + macro_rules! publisher { + ($timestamp:expr) => {{ + let timestamp = $timestamp; + let subscriber = endpoint.subscriber(); + event::EndpointPublisherSubscriber::new( + event::builder::EndpointMeta { + endpoint_type: E::ENDPOINT_TYPE, + timestamp, + }, + None, + subscriber, + ) + }}; + } + + let mut timer = clock.timer(); + + loop { + // Poll for RX readiness + let rx_ready = rx.ready(); + + // Poll for TX readiness + let tx_ready = tx.ready(); + + // Poll for any application-driven updates + let mut wakeups = endpoint.wakeups(&clock); + + // TODO use the [pin macro](https://doc.rust-lang.org/std/pin/macro.pin.html) once + // available in MSRV + let wakeups = unsafe { + // Safety: the wakeups future is on the stack and won't move + Pin::new_unchecked(&mut wakeups) + }; + + // Poll for timer expiration + let timer_ready = timer.ready(); + + // Concurrently poll all of the futures and wake up on the first one that's ready + let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready); + + let select::Outcome { + rx_result, + tx_result, + timeout_expired, + application_wakeup, + } = if let Ok(outcome) = select.await { + outcome + } else { + // The endpoint has shut down; stop the event loop + return; + }; + + // notify the application that we woke up and why + let wakeup_timestamp = clock.get_time(); + publisher!(wakeup_timestamp).on_platform_event_loop_wakeup( + event::builder::PlatformEventLoopWakeup { + timeout_expired, + rx_ready: rx_result.is_some(), + tx_ready: tx_result.is_some(), + application_wakeup, + }, + ); + + match rx_result { + Some(Ok(())) => { + // we received some packets. give them to the endpoint. + rx.queue(|queue| { + endpoint.receive(queue, &clock); + }); + } + Some(Err(error)) => { + // The RX provider has encountered an error. shut down the event loop + let mut publisher = publisher!(clock.get_time()); + rx.handle_error(error, &mut publisher); + return; + } + None => { + // We didn't receive any packets; nothing to do + } + } + + match tx_result { + Some(Ok(())) => { + // The TX queue was full and now has capacity. The endpoint can now continue to + // transmit + } + Some(Err(error)) => { + // The RX provider has encountered an error. shut down the event loop + let mut publisher = publisher!(clock.get_time()); + tx.handle_error(error, &mut publisher); + return; + } + None => { + // The TX queue is either waiting to be flushed or has capacity. Either way, we + // call `endpoint.transmit` to at least update the clock and poll any timer + // expirations. + } + } + + // Let the endpoint transmit, if possible + tx.queue(|queue| { + endpoint.transmit(queue, &clock); + }); + + // Get the next expiration from the endpoint and update the timer + let timeout = endpoint.timeout(); + if let Some(timeout) = timeout { + timer.update(timeout); + } + + let sleep_timestamp = clock.get_time(); + // compute the relative timeout to the current time + let timeout = timeout.map(|t| t.saturating_duration_since(sleep_timestamp)); + // compute how long it took to process the current iteration + let processing_duration = sleep_timestamp.saturating_duration_since(wakeup_timestamp); + + // publish the event to the application + publisher!(sleep_timestamp).on_platform_event_loop_sleep( + event::builder::PlatformEventLoopSleep { + timeout, + processing_duration, + }, + ); + } + } +} diff --git a/quic/s2n-quic-platform/src/io/select.rs b/quic/s2n-quic-core/src/io/event_loop/select.rs similarity index 65% rename from quic/s2n-quic-platform/src/io/select.rs rename to quic/s2n-quic-core/src/io/event_loop/select.rs index bec2feaa19..6fd3cc1d31 100644 --- a/quic/s2n-quic-platform/src/io/select.rs +++ b/quic/s2n-quic-core/src/io/event_loop/select.rs @@ -1,39 +1,37 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +use crate::endpoint::CloseError; use core::{ future::Future, pin::Pin, task::{Context, Poll}, }; -use futures::future::{Fuse, FutureExt}; -use pin_project::pin_project; -use s2n_quic_core::endpoint::CloseError; +use pin_project_lite::pin_project; -/// The main event loop future for selecting readiness of sub-tasks -/// -/// This future ensures all sub-tasks are polled fairly by yielding once -/// after completing any of the sub-tasks. This is especially important when the TX queue is -/// flushed quickly and we never get notified of the RX socket having packets to read. -#[pin_project] -pub struct Select -where - Rx: Future, - Tx: Future, - Wakeup: Future, - Sleep: Future, -{ - #[pin] - rx: Fuse, - rx_out: Option, - #[pin] - tx: Fuse, - tx_out: Option, - #[pin] - wakeup: Fuse, - #[pin] - sleep: Sleep, -} +pin_project!( + /// The main event loop future for selecting readiness of sub-tasks + /// + /// This future ensures all sub-tasks are polled fairly by yielding once + /// after completing any of the sub-tasks. This is especially important when the TX queue is + /// flushed quickly and we never get notified of the RX socket having packets to read. + pub struct Select + where + Rx: Future, + Tx: Future, + Wakeup: Future, + Sleep: Future, + { + #[pin] + rx: Rx, + #[pin] + tx: Tx, + #[pin] + wakeup: Wakeup, + #[pin] + sleep: Sleep, + } +); impl Select where @@ -45,11 +43,9 @@ where #[inline(always)] pub fn new(rx: Rx, tx: Tx, wakeup: Wakeup, sleep: Sleep) -> Self { Self { - rx: rx.fuse(), - rx_out: None, - tx: tx.fuse(), - tx_out: None, - wakeup: wakeup.fuse(), + rx, + tx, + wakeup, sleep, } } @@ -88,14 +84,16 @@ where } } + let mut rx_result = None; if let Poll::Ready(v) = this.rx.poll(cx) { should_wake = true; - *this.rx_out = Some(v); + rx_result = Some(v); } + let mut tx_result = None; if let Poll::Ready(v) = this.tx.poll(cx) { should_wake = true; - *this.tx_out = Some(v); + tx_result = Some(v); } let mut timeout_expired = false; @@ -111,8 +109,8 @@ where } Poll::Ready(Ok(Outcome { - rx_result: this.rx_out.take(), - tx_result: this.tx_out.take(), + rx_result, + tx_result, timeout_expired, application_wakeup, })) diff --git a/quic/s2n-quic-core/src/io/mod.rs b/quic/s2n-quic-core/src/io/mod.rs index 627526649f..2a989d5a13 100644 --- a/quic/s2n-quic-core/src/io/mod.rs +++ b/quic/s2n-quic-core/src/io/mod.rs @@ -1,5 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +pub mod event_loop; pub mod rx; pub mod tx; diff --git a/quic/s2n-quic-platform/Cargo.toml b/quic/s2n-quic-platform/Cargo.toml index 63d75360cd..2803eae1ed 100644 --- a/quic/s2n-quic-platform/Cargo.toml +++ b/quic/s2n-quic-platform/Cargo.toml @@ -16,7 +16,7 @@ std = ["s2n-quic-core/std", "socket2", "lazy_static"] testing = ["std", "generator", "futures/std", "io-testing"] # Testing allows to overwrite the system time io-testing = ["bach"] generator = ["bolero-generator", "s2n-quic-core/generator"] -tokio-runtime = ["futures", "pin-project", "tokio"] +tokio-runtime = ["futures", "tokio"] [dependencies] bach = { version = "0.0.6", optional = true } @@ -25,7 +25,6 @@ cfg-if = "1" errno = "0.3" futures = { version = "0.3", default-features = false, features = ["async-await"], optional = true } lazy_static = { version = "1", optional = true } -pin-project = { version = "1", optional = true } s2n-quic-core = { version = "=0.19.0", path = "../s2n-quic-core", default-features = false } socket2 = { version = "0.5", features = ["all"], optional = true } tokio = { version = "1", default-features = false, features = ["macros", "net", "rt", "time"], optional = true } diff --git a/quic/s2n-quic-platform/src/io.rs b/quic/s2n-quic-platform/src/io.rs index f6e42e3e56..5a8bcc0929 100644 --- a/quic/s2n-quic-platform/src/io.rs +++ b/quic/s2n-quic-platform/src/io.rs @@ -1,8 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -mod select; - #[cfg(feature = "tokio")] pub mod tokio; diff --git a/quic/s2n-quic-platform/src/io/testing.rs b/quic/s2n-quic-platform/src/io/testing.rs index 8bfd17ea20..040499ed05 100644 --- a/quic/s2n-quic-platform/src/io/testing.rs +++ b/quic/s2n-quic-platform/src/io/testing.rs @@ -1,15 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::select::{self, Select}; use bach::time::scheduler; -use core::{pin::Pin, task::Poll}; +use core::task::Poll; use s2n_quic_core::{ - endpoint::Endpoint, - event::{self, EndpointPublisher as _}, - inet::SocketAddress, - path::MaxMtu, - time::clock::Timer as _, + endpoint::Endpoint, inet::SocketAddress, io::event_loop::EventLoop, path::MaxMtu, }; type Error = std::io::Error; @@ -264,96 +259,17 @@ impl Io { let handle = address.unwrap_or_else(|| buffers.generate_addr()); - buffers.register(handle); + let (tx, rx) = buffers.register(handle); - let instance = Instance { - buffers, - handle, + let clock = time::Clock::default(); + + let event_loop = EventLoop { endpoint, + clock, + tx, + rx, }; - let join = executor.spawn(instance.event_loop()); + let join = executor.spawn(event_loop.start()); Ok((join, handle)) } } - -struct Instance { - buffers: network::Buffers, - handle: SocketAddress, - endpoint: E, -} - -impl> Instance { - async fn event_loop(self) { - let Self { - buffers, - handle, - mut endpoint, - } = self; - - let clock = time::Clock::default(); - let mut timer = time::Timer::default(); - - loop { - let io_task = buffers.readiness(handle); - - // make a future that never returns since we have a single future that checks both - let empty_task = futures::future::pending::<()>(); - - let mut wakeups = endpoint.wakeups(&clock); - let mut wakeups = Pin::new(&mut wakeups); - - let timer_ready = timer.ready(); - - let select::Outcome { - rx_result, - tx_result, - timeout_expired, - application_wakeup, - } = if let Ok(res) = Select::new(io_task, empty_task, &mut wakeups, timer_ready).await { - res - } else { - // The endpoint has shut down - return; - }; - - let wakeup_timestamp = time::now(); - let subscriber = endpoint.subscriber(); - let mut publisher = event::EndpointPublisherSubscriber::new( - event::builder::EndpointMeta { - endpoint_type: E::ENDPOINT_TYPE, - timestamp: wakeup_timestamp, - }, - None, - subscriber, - ); - - publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup { - timeout_expired, - rx_ready: rx_result.is_some(), - tx_ready: tx_result.is_some(), - application_wakeup, - }); - - if let Some(result) = rx_result { - if result.is_err() { - // the endpoint shut down - return; - } - - buffers.rx(handle, |queue| { - endpoint.receive(queue, &clock); - }); - } - - buffers.tx(handle, |queue| { - endpoint.transmit(queue, &clock); - }); - - if let Some(timestamp) = endpoint.timeout() { - timer.update(timestamp); - } else { - timer.cancel(); - } - } - } -} diff --git a/quic/s2n-quic-platform/src/io/testing/network.rs b/quic/s2n-quic-platform/src/io/testing/network.rs index 67929a9854..f3b434ec3d 100644 --- a/quic/s2n-quic-platform/src/io/testing/network.rs +++ b/quic/s2n-quic-platform/src/io/testing/network.rs @@ -1,15 +1,12 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll, Waker}, -}; +use core::task::{Context, Poll, Waker}; use s2n_quic_core::{ + event, inet::{datagram, ExplicitCongestionNotification, SocketAddress}, io::{ - self, + self, rx, tx::{self, Queue as _}, }, path::{LocalAddress, Tuple}, @@ -147,13 +144,6 @@ impl Buffers { n.execute(self); } - pub(crate) fn readiness(&self, handle: SocketAddress) -> Readiness { - Readiness { - network: self, - handle, - } - } - /// Generate a unique address pub fn generate_addr(&self) -> SocketAddress { let ip = self @@ -167,58 +157,102 @@ impl Buffers { } /// Register an address on the network - pub fn register(&self, handle: SocketAddress) { + pub fn register(&self, handle: SocketAddress) -> (TxIo, RxIo) { let mut lock = self.inner.lock().unwrap(); let queue = Queue::new(handle); lock.tx.insert(handle, queue.clone()); lock.rx.insert(handle, queue); + + let tx = TxIo { + buffers: self.clone(), + handle, + }; + let rx = RxIo { + buffers: self.clone(), + handle, + }; + + (tx, rx) } } -pub(crate) struct Readiness<'a> { - network: &'a Buffers, +pub struct TxIo { + buffers: Buffers, handle: SocketAddress, } -impl<'a> Future for Readiness<'a> { - type Output = Result<(), ()>; +impl tx::Tx for TxIo { + type PathHandle = Tuple; + type Queue = Queue; + type Error = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut lock = self.network.inner.lock().unwrap(); + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + let mut lock = self.buffers.inner.lock().unwrap(); if !lock.is_open { return Err(()).into(); } - let mut is_ready = false; - let tx = lock.tx.get_mut(&self.handle).unwrap(); - if tx.is_blocked { - // if we were blocked and now have capacity wake up the endpoint - if tx.has_capacity() { - tx.is_blocked = false; - is_ready = true; - } else { - tx.waker = Some(cx.waker().clone()); - } - } - let rx = lock.rx.get_mut(&self.handle).unwrap(); - // wake up the endpoint if we have an rx message - if io::rx::Queue::is_empty(rx) { - rx.waker = Some(cx.waker().clone()); - } else { - is_ready = true; + // If we weren't previously full, then return pending so we don't spin + if !tx.is_blocked { + return Poll::Pending; } - if is_ready { + // if we were blocked and now have capacity wake up the endpoint + if tx.has_capacity() { + tx.is_blocked = false; Poll::Ready(Ok(())) } else { + tx.waker = Some(cx.waker().clone()); Poll::Pending } } + + fn queue(&mut self, f: F) { + self.buffers.tx(self.handle, f); + } + + fn handle_error(self, _error: Self::Error, _events: &mut E) {} +} + +pub struct RxIo { + buffers: Buffers, + handle: SocketAddress, +} + +impl rx::Rx for RxIo { + type PathHandle = Tuple; + type Queue = Queue; + type Error = (); + + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + let mut lock = self.buffers.inner.lock().unwrap(); + + if !lock.is_open { + return Err(()).into(); + } + + let rx = lock.rx.get_mut(&self.handle).unwrap(); + + // wake up the endpoint if we have an rx message + if !io::rx::Queue::is_empty(rx) { + return Poll::Ready(Ok(())); + } + + // store the waker for later notifications + rx.waker = Some(cx.waker().clone()); + Poll::Pending + } + + fn queue(&mut self, f: F) { + self.buffers.rx(self.handle, f); + } + + fn handle_error(self, _error: Self::Error, _events: &mut E) {} } #[derive(Debug)] diff --git a/quic/s2n-quic-platform/src/io/tokio.rs b/quic/s2n-quic-platform/src/io/tokio.rs index cbc3a330cd..87897dc5af 100644 --- a/quic/s2n-quic-platform/src/io/tokio.rs +++ b/quic/s2n-quic-platform/src/io/tokio.rs @@ -1,13 +1,13 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::select::{self, Select}; use crate::{buffer::default as buffer, features::gso, socket::default as socket, syscall}; use cfg_if::cfg_if; use s2n_quic_core::{ endpoint::Endpoint, event::{self, EndpointPublisher as _}, inet::{self, SocketAddress}, + io::event_loop::select::{self, Select}, path::MaxMtu, time::{ clock::{ClockWithTimer as _, Timer as _}, diff --git a/quic/s2n-quic-platform/src/io/turmoil.rs b/quic/s2n-quic-platform/src/io/turmoil.rs index aacbb06f8a..1936fba05a 100644 --- a/quic/s2n-quic-platform/src/io/turmoil.rs +++ b/quic/s2n-quic-platform/src/io/turmoil.rs @@ -1,12 +1,12 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::select::{self, Select}; use crate::{buffer::default as buffer, io::tokio::Clock, socket::std as socket}; use s2n_quic_core::{ endpoint::Endpoint, event::{self, EndpointPublisher as _}, inet::SocketAddress, + io::event_loop::select::{self, Select}, path::MaxMtu, time::{ clock::{ClockWithTimer as _, Timer as _},