Skip to content

Commit

Permalink
chore: improve flaky tests for wasi http
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardomourar committed Aug 26, 2023
1 parent 09a16ed commit 61dec00
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 113 deletions.
1 change: 1 addition & 0 deletions crates/test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ http-body-util = "0.1.0-rc.2"
hyper = { version = "1.0.0-rc.3", features = ["full"] }
is-terminal = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] }
tracing = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
207 changes: 150 additions & 57 deletions crates/test-programs/src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Bytes, service::service_fn, Request, Response};
use std::{
net::{SocketAddr, TcpListener, TcpStream},
sync::OnceLock,
future::Future,
net::{SocketAddr, TcpListener},
sync::{mpsc, OnceLock},
};

async fn test(
mut req: Request<hyper::body::Incoming>,
) -> http::Result<Response<BoxBody<Bytes, std::convert::Infallible>>> {
tracing::debug!("preparing mocked response",);
let method = req.method().to_string();
let body = req.body_mut().collect().await.unwrap();
let buf = body.to_bytes();
tracing::trace!(
"hyper request body {:?}",
std::str::from_utf8(&buf[..]).unwrap()
);

Response::builder()
.status(http::StatusCode::OK)
Expand All @@ -19,12 +25,61 @@ async fn test(
.body(Full::<Bytes>::from(buf).boxed())
}

async fn serve_http1_connection(stream: TcpStream) -> Result<(), hyper::Error> {
let mut builder = hyper::server::conn::http1::Builder::new();
let http = builder.keep_alive(false).pipeline_flush(true);
stream.set_nonblocking(true).unwrap();
let io = tokio::net::TcpStream::from_std(stream).unwrap();
http.serve_connection(io, service_fn(test)).await
struct ServerHttp1 {
receiver: mpsc::Receiver<anyhow::Result<()>>,
}

impl ServerHttp1 {
fn new() -> Self {
tracing::debug!("initializing http1 server");
static CELL_HTTP1: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP1.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("preparing tcp listener at localhost:3000");
TcpListener::bind(addr).unwrap()
});
let (sender, receiver) = mpsc::channel::<anyhow::Result<()>>();
std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => {
tracing::debug!("using tokio runtime");
sender
.send(rt.block_on(async move {
tracing::debug!("preparing to accept connection");
let (stream, _) = listener.accept().map_err(anyhow::Error::from)?;
tracing::trace!("tcp stream {:?}", stream);

let mut builder = hyper::server::conn::http1::Builder::new();
let http = builder.keep_alive(false).pipeline_flush(true);
let io = tokio::net::TcpStream::from_std(stream)
.map_err(anyhow::Error::from)?;

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
conn.map_err(anyhow::Error::from)
}))
.expect("value sent from http1 server dedicated thread");
}
Err(e) => {
tracing::debug!("unable to start tokio runtime");
sender.send(Err(anyhow::Error::from(e))).unwrap()
}
};
});
Self { receiver }
}

fn shutdown(self) -> anyhow::Result<()> {
tracing::debug!("shuting down http2 server");
self.receiver
.recv()
.expect("value received from http1 server dedicated thread")
}
}

#[derive(Clone)]
Expand All @@ -33,64 +88,102 @@ pub struct TokioExecutor;

impl<F> hyper::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + Send + 'static,
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

async fn serve_http2_connection(stream: TcpStream) -> Result<(), hyper::Error> {
let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);
let http = builder.max_concurrent_streams(20);
let io = tokio::net::TcpStream::from_std(stream).unwrap();
http.serve_connection(io, service_fn(test)).await
struct ServerHttp2 {
receiver: mpsc::Receiver<anyhow::Result<()>>,
}

impl ServerHttp2 {
fn new() -> Self {
tracing::debug!("initializing http2 server");
static CELL_HTTP2: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP2.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
tracing::debug!("preparing tcp listener at localhost:3001");
TcpListener::bind(addr).unwrap()
});
let (sender, receiver) = mpsc::channel::<anyhow::Result<()>>();
std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => {
tracing::debug!("using tokio runtime");
sender
.send(rt.block_on(async move {
tracing::debug!("preparing to accept incoming connection");
let (stream, _) = listener.accept().map_err(anyhow::Error::from)?;
tracing::trace!("tcp stream {:?}", stream);

let mut builder =
hyper::server::conn::http2::Builder::new(TokioExecutor);
let http = builder.max_concurrent_streams(20);
let io = tokio::net::TcpStream::from_std(stream)
.map_err(anyhow::Error::from)?;

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
if let Err(e) = &conn {
let message = e.to_string();
if message.contains("connection closed before reading preface")
|| message.contains("unspecific protocol error detected")
{
return Ok(());
}
}
conn.map_err(anyhow::Error::from)
}))
.expect("value sent from http2 server dedicated thread");
}
Err(e) => {
tracing::debug!("unable to start tokio runtime");
sender.send(Err(anyhow::Error::from(e))).unwrap()
}
};
});
Self { receiver }
}

fn shutdown(self) -> anyhow::Result<()> {
tracing::debug!("shuting down http2 server");
self.receiver
.recv()
.expect("value received from http2 server dedicated thread")
}
}

pub async fn setup_http1(
future: impl std::future::Future<Output = anyhow::Result<()>>,
) -> Result<(), anyhow::Error> {
static CELL_HTTP1: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP1.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
TcpListener::bind(addr).unwrap()
});

let thread = tokio::task::spawn(async move {
let (stream, _) = listener.accept().unwrap();
let conn = serve_http1_connection(stream).await;
if let Err(err) = conn {
eprintln!("Error serving connection: {:?}", err);
}
});

let (future_result, thread_result) = tokio::join!(future, thread);
future_result?;
thread_result.unwrap();

Ok(())
pub async fn setup_http1(f: impl Future<Output = anyhow::Result<()>>) -> anyhow::Result<()> {
tracing::debug!("preparing http1 server asynchronously");
let server = ServerHttp1::new();

tracing::debug!("running inner function (future)");
let result = f.await;

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] failure {:?}", err);
}
result
}

pub async fn setup_http2(
future: impl std::future::Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()> {
static CELL_HTTP2: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP2.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
TcpListener::bind(addr).unwrap()
});
let thread = tokio::task::spawn(async move {
let (stream, _) = listener.accept().unwrap();
let conn = serve_http2_connection(stream).await;
if let Err(err) = conn {
eprintln!("Error serving connection: {:?}", err);
}
});

let (future_result, thread_result) = tokio::join!(future, thread);
future_result?;
thread_result.unwrap();

Ok(())
pub async fn setup_http2(f: impl Future<Output = anyhow::Result<()>>) -> anyhow::Result<()> {
tracing::debug!("preparing http2 server asynchronously");
let server = ServerHttp2::new();

tracing::debug!("running inner function (future)");
let result = f.await;

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] Failure: {:?}", err);
}
result
}
61 changes: 43 additions & 18 deletions crates/test-programs/tests/wasi-http-components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use wasmtime::{
};
use wasmtime_wasi::preview2::{
command::{add_to_linker, Command},
Table, WasiCtx, WasiCtxBuilder, WasiView,
pipe::MemoryOutputPipe,
IsATTY, Table, WasiCtx, WasiCtxBuilder, WasiView,
};
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};

Expand Down Expand Up @@ -69,23 +70,47 @@ async fn instantiate_component(
}

async fn run(name: &str) -> anyhow::Result<()> {
let mut table = Table::new();
let component = get_component(name);

// Create our wasi context.
let wasi = WasiCtxBuilder::new()
.inherit_stdio()
.arg(name)
.build(&mut table)?;
let http = WasiHttpCtx::new();

let (mut store, command) = instantiate_component(component, Ctx { table, wasi, http }).await?;
command
.wasi_cli_run()
.call_run(&mut store)
.await
.map_err(|e| anyhow::anyhow!("wasm failed with {e:?}"))?
.map_err(|e| anyhow::anyhow!("command returned with failing exit status {e:?}"))
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let r = {
let mut table = Table::new();
let component = get_component(name);

// Create our wasi context.
let mut builder = WasiCtxBuilder::new();
builder.stdout(stdout.clone(), IsATTY::No);
builder.stderr(stderr.clone(), IsATTY::No);
builder.arg(name);
for (var, val) in test_programs::wasi_tests_environment() {
builder.env(var, val);
}
let wasi = builder.build(&mut table)?;
let http = WasiHttpCtx::new();

let (mut store, command) =
instantiate_component(component, Ctx { table, wasi, http }).await?;
command
.wasi_cli_run()
.call_run(&mut store)
.await?
.map_err(|()| anyhow::anyhow!("run returned a failure"))?;
Ok(())
};
r.map_err(move |trap: anyhow::Error| {
let stdout = stdout.try_into_inner().expect("single ref to stdout");
if !stdout.is_empty() {
println!("[guest] stdout:\n{}\n===", String::from_utf8_lossy(&stdout));
}
let stderr = stderr.try_into_inner().expect("single ref to stderr");
if !stderr.is_empty() {
println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr));
}
trap.context(format!(
"error while testing wasi-tests {} with http-components",
name
))
})?;
Ok(())
}

#[test_log::test(tokio::test(flavor = "multi_thread"))]
Expand Down
Loading

0 comments on commit 61dec00

Please sign in to comment.