Skip to content

Commit

Permalink
feat(s2n-quic-core): add generic IO event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed May 3, 2023
1 parent 1b4cbaa commit 69e2671
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 174 deletions.
1 change: 1 addition & 0 deletions quic/s2n-quic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ hex-literal = "0.4"
insta = { version = ">=1.12", features = ["json"], optional = true }
num-rational = { version = "0.4", default-features = false }
num-traits = { version = "0.2", default-features = false, features = ["libm"] }
pin-project-lite = { version = "0.2" }
s2n-codec = { version = "=0.5.0", path = "../../common/s2n-codec", default-features = false }
subtle = { version = "2", default-features = false }
tracing = { version = "0.1", default-features = false, optional = true }
Expand Down
164 changes: 164 additions & 0 deletions quic/s2n-quic-core/src/io/event_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
endpoint::Endpoint,
event::{self, EndpointPublisher},
io::{rx::Rx, tx::Tx},
time::clock::{ClockWithTimer, Timer},
};
use core::pin::Pin;

pub mod select;
use select::Select;

pub struct EventLoop<E, C, R, T> {
pub endpoint: E,
pub clock: C,
pub rx: R,
pub tx: T,
}

impl<E, C, R, T> EventLoop<E, C, R, T>
where
E: Endpoint,
C: ClockWithTimer,
R: Rx<PathHandle = E::PathHandle>,
T: Tx<PathHandle = E::PathHandle>,
{
/// Starts running the endpoint event loop in an async task
pub async fn start(self) {
let Self {
mut endpoint,
clock,
mut rx,
mut tx,
} = self;

/// Creates a event publisher with the endpoint's subscriber
macro_rules! publisher {
($timestamp:expr) => {{
let timestamp = $timestamp;
let subscriber = endpoint.subscriber();
event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: E::ENDPOINT_TYPE,
timestamp,
},
None,
subscriber,
)
}};
}

let mut timer = clock.timer();

loop {
// Poll for RX readiness
let rx_ready = rx.ready();

// Poll for TX readiness
let tx_ready = tx.ready();

// Poll for any application-driven updates
let mut wakeups = endpoint.wakeups(&clock);

// TODO use the [pin macro](https://doc.rust-lang.org/std/pin/macro.pin.html) once
// available in MSRV
let wakeups = unsafe {
// Safety: the wakeups future is on the stack and won't move
Pin::new_unchecked(&mut wakeups)
};

// Poll for timer expiration
let timer_ready = timer.ready();

// Concurrently poll all of the futures and wake up on the first one that's ready
let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready);

let select::Outcome {
rx_result,
tx_result,
timeout_expired,
application_wakeup,
} = if let Ok(outcome) = select.await {
outcome
} else {
// The endpoint has shut down; stop the event loop
return;
};

// notify the application that we woke up and why
let wakeup_timestamp = clock.get_time();
publisher!(wakeup_timestamp).on_platform_event_loop_wakeup(
event::builder::PlatformEventLoopWakeup {
timeout_expired,
rx_ready: rx_result.is_some(),
tx_ready: tx_result.is_some(),
application_wakeup,
},
);

match rx_result {
Some(Ok(())) => {
// we received some packets. give them to the endpoint.
rx.queue(|queue| {
endpoint.receive(queue, &clock);
});
}
Some(Err(error)) => {
// The RX provider has encountered an error. shut down the event loop
let mut publisher = publisher!(clock.get_time());
rx.handle_error(error, &mut publisher);
return;
}
None => {
// We didn't receive any packets; nothing to do
}
}

match tx_result {
Some(Ok(())) => {
// The TX queue was full and now has capacity. The endpoint can now continue to
// transmit
}
Some(Err(error)) => {
// The RX provider has encountered an error. shut down the event loop
let mut publisher = publisher!(clock.get_time());
tx.handle_error(error, &mut publisher);
return;
}
None => {
// The TX queue is either waiting to be flushed or has capacity. Either way, we
// call `endpoint.transmit` to at least update the clock and poll any timer
// expirations.
}
}

// Let the endpoint transmit, if possible
tx.queue(|queue| {
endpoint.transmit(queue, &clock);
});

// Get the next expiration from the endpoint and update the timer
let timeout = endpoint.timeout();
if let Some(timeout) = timeout {
timer.update(timeout);
}

let sleep_timestamp = clock.get_time();
// compute the relative timeout to the current time
let timeout = timeout.map(|t| t.saturating_duration_since(sleep_timestamp));
// compute how long it took to process the current iteration
let processing_duration = sleep_timestamp.saturating_duration_since(wakeup_timestamp);

// publish the event to the application
publisher!(sleep_timestamp).on_platform_event_loop_sleep(
event::builder::PlatformEventLoopSleep {
timeout,
processing_duration,
},
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::endpoint::CloseError;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::future::{Fuse, FutureExt};
use pin_project::pin_project;
use s2n_quic_core::endpoint::CloseError;
use pin_project_lite::pin_project;

/// The main event loop future for selecting readiness of sub-tasks
///
/// This future ensures all sub-tasks are polled fairly by yielding once
/// after completing any of the sub-tasks. This is especially important when the TX queue is
/// flushed quickly and we never get notified of the RX socket having packets to read.
#[pin_project]
pub struct Select<Rx, Tx, Wakeup, Sleep>
where
Rx: Future,
Tx: Future,
Wakeup: Future,
Sleep: Future,
{
#[pin]
rx: Fuse<Rx>,
rx_out: Option<Rx::Output>,
#[pin]
tx: Fuse<Tx>,
tx_out: Option<Tx::Output>,
#[pin]
wakeup: Fuse<Wakeup>,
#[pin]
sleep: Sleep,
}
pin_project!(
/// The main event loop future for selecting readiness of sub-tasks
///
/// This future ensures all sub-tasks are polled fairly by yielding once
/// after completing any of the sub-tasks. This is especially important when the TX queue is
/// flushed quickly and we never get notified of the RX socket having packets to read.
pub struct Select<Rx, Tx, Wakeup, Sleep>
where
Rx: Future,
Tx: Future,
Wakeup: Future,
Sleep: Future,
{
#[pin]
rx: Rx,
#[pin]
tx: Tx,
#[pin]
wakeup: Wakeup,
#[pin]
sleep: Sleep,
}
);

impl<Rx, Tx, Wakeup, Sleep> Select<Rx, Tx, Wakeup, Sleep>
where
Expand All @@ -45,11 +43,9 @@ where
#[inline(always)]
pub fn new(rx: Rx, tx: Tx, wakeup: Wakeup, sleep: Sleep) -> Self {
Self {
rx: rx.fuse(),
rx_out: None,
tx: tx.fuse(),
tx_out: None,
wakeup: wakeup.fuse(),
rx,
tx,
wakeup,
sleep,
}
}
Expand Down Expand Up @@ -88,14 +84,16 @@ where
}
}

let mut rx_result = None;
if let Poll::Ready(v) = this.rx.poll(cx) {
should_wake = true;
*this.rx_out = Some(v);
rx_result = Some(v);
}

let mut tx_result = None;
if let Poll::Ready(v) = this.tx.poll(cx) {
should_wake = true;
*this.tx_out = Some(v);
tx_result = Some(v);
}

let mut timeout_expired = false;
Expand All @@ -111,8 +109,8 @@ where
}

Poll::Ready(Ok(Outcome {
rx_result: this.rx_out.take(),
tx_result: this.tx_out.take(),
rx_result,
tx_result,
timeout_expired,
application_wakeup,
}))
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod event_loop;
pub mod rx;
pub mod tx;
3 changes: 1 addition & 2 deletions quic/s2n-quic-platform/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ std = ["s2n-quic-core/std", "socket2", "lazy_static"]
testing = ["std", "generator", "futures/std", "io-testing"] # Testing allows to overwrite the system time
io-testing = ["bach"]
generator = ["bolero-generator", "s2n-quic-core/generator"]
tokio-runtime = ["futures", "pin-project", "tokio"]
tokio-runtime = ["futures", "tokio"]

[dependencies]
bach = { version = "0.0.6", optional = true }
Expand All @@ -25,7 +25,6 @@ cfg-if = "1"
errno = "0.3"
futures = { version = "0.3", default-features = false, features = ["async-await"], optional = true }
lazy_static = { version = "1", optional = true }
pin-project = { version = "1", optional = true }
s2n-quic-core = { version = "=0.19.0", path = "../s2n-quic-core", default-features = false }
socket2 = { version = "0.5", features = ["all"], optional = true }
tokio = { version = "1", default-features = false, features = ["macros", "net", "rt", "time"], optional = true }
Expand Down
2 changes: 0 additions & 2 deletions quic/s2n-quic-platform/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

mod select;

#[cfg(feature = "tokio")]
pub mod tokio;

Expand Down
Loading

0 comments on commit 69e2671

Please sign in to comment.