Skip to content

Commit

Permalink
feat: add experimental turmoil IO provider and example (#1663)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Mar 29, 2023
1 parent deded3d commit 1ab3385
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 1 deletion.
14 changes: 14 additions & 0 deletions examples/turmoil-provider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "turmoil-provider"
version = "0.1.0"
authors = ["AWS s2n"]
edition = "2021"

[dependencies]
s2n-quic = { version = "1", path = "../../quic/s2n-quic", features = ["provider-event-tracing", "unstable-provider-io-turmoil"] }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
turmoil = { version = "0.5.2" }

[workspace]
members = ["."]
3 changes: 3 additions & 0 deletions examples/turmoil-provider/rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.66.0"
components = ["rustc", "clippy", "rustfmt"]
161 changes: 161 additions & 0 deletions examples/turmoil-provider/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#![cfg(test)]

use s2n_quic::{
client::Connect,
provider::{event, io},
Client, Server,
};
use std::net::SocketAddr;
use turmoil::{lookup, Builder, Result};

/// NOTE: this certificate is to be used for demonstration purposes only!
pub static CERT_PEM: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../quic/s2n-quic-core/certs/cert.pem"
));
/// NOTE: this certificate is to be used for demonstration purposes only!
pub static KEY_PEM: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../quic/s2n-quic-core/certs/key.pem"
));

#[test]
fn lossy_handshake() -> Result {
let mut sim = Builder::new()
.simulation_duration(core::time::Duration::from_secs(20))
.build();

sim.host("server", || async move {
let io = io::turmoil::Builder::default()
.with_address(bind_to(443))?
.build()?;

let mut server = Server::builder()
.with_io(io)?
.with_tls((CERT_PEM, KEY_PEM))?
.with_event(events())?
.start()?;

while let Some(mut connection) = server.accept().await {
tokio::spawn(async move {
eprintln!("Connection accepted from {:?}", connection.remote_addr());

while let Ok(Some(mut stream)) = connection.accept_bidirectional_stream().await {
tokio::spawn(async move {
eprintln!("Stream opened from {:?}", stream.connection().remote_addr());

// echo any data back to the stream
while let Ok(Some(data)) = stream.receive().await {
stream.send(data).await.expect("stream should be open");
}
});
}
});
}

Ok(())
});

sim.client("client", async move {
let io = io::turmoil::Builder::default()
.with_address(bind_to(1234))?
.build()?;

let client = Client::builder()
.with_io(io)?
.with_tls(CERT_PEM)?
.with_event(events())?
.start()?;

// drop packets for 1 second
drop_for(1);

// even though we're dropping packets, the connection still goes through
let server_addr: SocketAddr = (lookup("server"), 443).into();
let mut connection = client
.connect(Connect::new(server_addr).with_server_name("localhost"))
.await?;

// drop packets for 5 seconds
drop_for(5);

// even though we're dropping packets, the stream should still complete
let mut stream = connection.open_bidirectional_stream().await?;
stream.send(vec![1, 2, 3].into()).await?;
stream.finish()?;

let response = stream.receive().await?.unwrap();
assert_eq!(&response[..], &[1, 2, 3]);

Ok(())
});

sim.run()?;

Ok(())
}

pub fn events() -> event::tracing::Provider {
use std::sync::Once;

static TRACING: Once = Once::new();

// make sure this only gets initialized once
TRACING.call_once(|| {
use tokio::time::Instant;

struct TokioUptime {
epoch: Instant,
}

impl Default for TokioUptime {
fn default() -> Self {
Self {
epoch: Instant::now(),
}
}
}

impl tracing_subscriber::fmt::time::FormatTime for TokioUptime {
fn format_time(
&self,
w: &mut tracing_subscriber::fmt::format::Writer,
) -> std::fmt::Result {
write!(w, "{:?}", self.epoch.elapsed())
}
}

let format = tracing_subscriber::fmt::format()
.with_level(false) // don't include levels in formatted output
.with_timer(TokioUptime::default())
.with_ansi(false)
.compact(); // Use a less verbose output format.

tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::new("trace"))
.event_format(format)
.with_test_writer()
.init();
});

event::tracing::Provider::default()
}

fn bind_to(port: u16) -> SocketAddr {
(std::net::Ipv4Addr::UNSPECIFIED, port).into()
}

fn drop_for(secs: u64) {
turmoil::partition("client", "server");
tokio::spawn(async move {
sleep_ms(secs * 1000).await;
turmoil::repair("client", "server");
});
}

async fn sleep_ms(millis: u64) {
tokio::time::sleep(core::time::Duration::from_millis(millis)).await
}
1 change: 1 addition & 0 deletions quic/s2n-quic-platform/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pin-project = { version = "1", optional = true }
s2n-quic-core = { version = "=0.18.1", path = "../s2n-quic-core", default-features = false }
socket2 = { version = "0.4", features = ["all"], optional = true }
tokio = { version = "1", default-features = false, features = ["macros", "net", "rt", "time"], optional = true }
turmoil = { version = "0.5.2", optional = true }
zeroize = { version = "1", default-features = false }

[target.'cfg(unix)'.dependencies]
Expand Down
3 changes: 3 additions & 0 deletions quic/s2n-quic-platform/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ pub mod tokio;

#[cfg(any(test, feature = "io-testing"))]
pub mod testing;

#[cfg(feature = "turmoil")]
pub mod turmoil;
2 changes: 1 addition & 1 deletion quic/s2n-quic-platform/src/io/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{net::UdpSocket, runtime::Handle};
pub type PathHandle = socket::Handle;

mod clock;
use clock::Clock;
pub(crate) use clock::Clock;

impl crate::socket::std::Socket for UdpSocket {
type Error = io::Error;
Expand Down
Loading

0 comments on commit 1ab3385

Please sign in to comment.