Skip to content

Commit

Permalink
WASI preview 2 output-streams: new backpressure and flushing design (b…
Browse files Browse the repository at this point in the history
…ytecodealliance#6877)

* Stream backpressure v2

Co-authored-by: Pat Hickey <[email protected]>
Co-authored-by: Trevor Elliott <[email protected]>
Co-authored-by: Dan Gohman <[email protected]>

Stop testing pseudocode

Restructure when notifications are sent, and make sure to flush the writer

Fix the wasi-http module versions of flush and blocking_flush

Use blocking_write_and_flush for blocking writes in the adapters

Fix a warning in wasi-http

Remove an unused DropPollable

add comment explaining try_write for tcpstream

refactor: separate struct for representing TcpReadStream

by factoring into HostTcpSocket a little bit

tcp read stream: handle stream closing

tcp tests: use blocking_read where its expecting to wait for input

move common test body into wasi-sockets-tests/src/lib.rs

ensure parent socket outlives pollable

input and output streams can be children now

tcp's streams are the sockets children

tcp.wit: document child relationships

tcp tests: fix to drop socket after its child streams

review feedback: propogate worker task panic

style

error source fix

tcp: use preview2::spawn, and propogate worker panics

join handle await always propogates panic

background task handles ewouldblock as well

document choice of constant

* sync wit notes into wasi-http

* improve wit docs for output-stream

* doc: document `HostOutputStream` (bytecodealliance#6980)

* doc: document `HostOutputStream`

Signed-off-by: Roman Volosatovs <[email protected]>
Co-authored-by: Pat Hickey <[email protected]>

* fix(wasi): fail when `MemoryOutputStream` buffer is full

Signed-off-by: Roman Volosatovs <[email protected]>

---------

Signed-off-by: Roman Volosatovs <[email protected]>
Co-authored-by: Pat Hickey <[email protected]>

* rustfmt

prtest:full

* windows and doc fixes

* cli test wasi-http: use blocking-write-and-flush

* Disable some tests, and adjust timeouts when running under qemu

* Try to reproduce the riscv64 failures

* Update riscv to LLVM 17 with beta rust

* Revert "Try to reproduce the riscv64 failures"

This reverts commit 8ac6781.

* Pin the beta version for riscv64

* Fix a warning on nightly

---------

Signed-off-by: Roman Volosatovs <[email protected]>
Co-authored-by: Roman Volosatovs <[email protected]>
Co-authored-by: Trevor Elliott <[email protected]>
Co-authored-by: Alex Crichton <[email protected]>
  • Loading branch information
4 people authored and eduardomourar committed Sep 13, 2023
1 parent f9a9741 commit 90a0a7a
Show file tree
Hide file tree
Showing 39 changed files with 2,039 additions and 1,287 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ futures = { version = "0.3.27", default-features = false }
indexmap = "2.0.0"
pretty_env_logger = "0.5.0"
syn = "2.0.25"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ['fmt', 'env-filter'] }

[features]
default = [
Expand Down
8 changes: 7 additions & 1 deletion ci/build-test-matrix.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ const array = [
"qemu_target": "riscv64-linux-user",
"name": "Test Linux riscv64",
"filter": "linux-riscv64",
"isa": "riscv64"
"isa": "riscv64",
// There appears to be a miscompile in Rust 1.72 for riscv64 where
// wasmtime-wasi tests are segfaulting in CI with the stack pointing in
// Tokio. Updating rustc seems to do the trick, so without doing a full
// rigorous investigation this uses beta for now but Rust 1.73 should be
// good to go for this.
"rust": "beta-2023-09-10",
}
];

Expand Down
7 changes: 2 additions & 5 deletions crates/test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ tracing = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
tempfile = { workspace = true }
test-log = { version = "0.2", default-features = false, features = ["trace"] }
test-log = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.1", default-features = false, features = [
'fmt',
'env-filter',
] }
tracing-subscriber = { workspace = true }
lazy_static = "1"
wasmtime = { workspace = true, features = ['cranelift', 'component-model'] }

Expand Down
42 changes: 41 additions & 1 deletion crates/test-programs/reactor-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,21 @@ wit_bindgen::generate!({
});

struct T;
use wasi::io::streams;
use wasi::poll::poll;

static mut STATE: Vec<String> = Vec::new();

struct DropPollable {
pollable: poll::Pollable,
}

impl Drop for DropPollable {
fn drop(&mut self) {
poll::drop_pollable(self.pollable);
}
}

impl Guest for T {
fn add_strings(ss: Vec<String>) -> u32 {
for s in ss {
Expand All @@ -28,10 +40,38 @@ impl Guest for T {
}

fn write_strings_to(o: OutputStream) -> Result<(), ()> {
let sub = DropPollable {
pollable: streams::subscribe_to_output_stream(o),
};
unsafe {
for s in STATE.iter() {
wasi::io::streams::write(o, s.as_bytes()).map_err(|_| ())?;
let mut out = s.as_bytes();
while !out.is_empty() {
poll::poll_oneoff(&[sub.pollable]);
let n = match streams::check_write(o) {
Ok(n) => n,
Err(_) => return Err(()),
};

let len = (n as usize).min(out.len());
match streams::write(o, &out[..len]) {
Ok(_) => out = &out[len..],
Err(_) => return Err(()),
}
}
}

match streams::flush(o) {
Ok(_) => {}
Err(_) => return Err(()),
}

poll::poll_oneoff(&[sub.pollable]);
match streams::check_write(o) {
Ok(_) => {}
Err(_) => return Err(()),
}

Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/test-programs/tests/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn reactor_tests() -> Result<()> {
// `host` and `wasi-common` crate.
// Note, this works because of the add_to_linker invocations using the
// `host` crate for `streams`, not because of `with` in the bindgen macro.
let writepipe = preview2::pipe::MemoryOutputPipe::new();
let writepipe = preview2::pipe::MemoryOutputPipe::new(4096);
let table_ix = preview2::TableStreamExt::push_output_stream(
store.data_mut().table_mut(),
Box::new(writepipe.clone()),
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-components-sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ fn instantiate_component(
}

fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let component = get_component(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async fn instantiate_component(
}

async fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let component = get_component(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ async fn instantiate_module(module: Module, ctx: Ctx) -> Result<(Store<Ctx>, Fun
}

async fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let module = get_module(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview1-host-in-preview2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

async fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker_async(&mut linker)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview2-components-sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker(&mut linker)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview2-components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

async fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker(&mut linker)?;
Expand Down
58 changes: 40 additions & 18 deletions crates/test-programs/wasi-http-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod bindings {
});
}

use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Result};
use std::fmt;
use std::sync::OnceLock;

Expand Down Expand Up @@ -42,6 +42,16 @@ impl Response {
}
}

struct DropPollable {
pollable: poll::Pollable,
}

impl Drop for DropPollable {
fn drop(&mut self) {
poll::drop_pollable(self.pollable);
}
}

pub async fn request(
method: http_types::Method,
scheme: http_types::Scheme,
Expand Down Expand Up @@ -72,27 +82,39 @@ pub async fn request(
let request_body = http_types::outgoing_request_write(request)
.map_err(|_| anyhow!("outgoing request write failed"))?;

if let Some(body) = body {
let output_stream_pollable = streams::subscribe_to_output_stream(request_body);
let len = body.len();
if len == 0 {
let (_written, _status) = streams::write(request_body, &[])
.map_err(|_| anyhow!("request_body stream write failed"))
.context("writing empty request body")?;
} else {
let mut body_cursor = 0;
while body_cursor < body.len() {
let (written, _status) = streams::write(request_body, &body[body_cursor..])
.map_err(|_| anyhow!("request_body stream write failed"))
.context("writing request body")?;
body_cursor += written as usize;
if let Some(mut buf) = body {
let sub = DropPollable {
pollable: streams::subscribe_to_output_stream(request_body),
};
while !buf.is_empty() {
poll::poll_oneoff(&[sub.pollable]);

let permit = match streams::check_write(request_body) {
Ok(n) => usize::try_from(n)?,
Err(_) => anyhow::bail!("output stream error"),
};

let len = buf.len().min(permit);
let (chunk, rest) = buf.split_at(len);
buf = rest;

match streams::write(request_body, chunk) {
Err(_) => anyhow::bail!("output stream error"),
_ => {}
}
}

// TODO: enable when working as expected
// let _ = poll::poll_oneoff(&[output_stream_pollable]);
match streams::flush(request_body) {
Err(_) => anyhow::bail!("output stream error"),
_ => {}
}

poll::poll_oneoff(&[sub.pollable]);

poll::drop_pollable(output_stream_pollable);
match streams::check_write(request_body) {
Ok(_) => {}
Err(_) => anyhow::bail!("output stream error"),
};
}

let future_response = outgoing_handler::handle(request, None);
Expand Down
93 changes: 4 additions & 89 deletions crates/test-programs/wasi-sockets-tests/src/bin/tcp_v4.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
//! A simple TCP testcase, using IPv4.
use wasi::io::streams;
use wasi::poll::poll;
use wasi::sockets::network::{IpAddressFamily, IpSocketAddress, Ipv4SocketAddress};
use wasi::sockets::{instance_network, network, tcp, tcp_create_socket};
use wasi::sockets::{instance_network, tcp, tcp_create_socket};
use wasi_sockets_tests::*;

fn wait(sub: poll::Pollable) {
loop {
let wait = poll::poll_oneoff(&[sub]);
if wait[0] {
break;
}
}
}

fn main() {
let first_message = b"Hello, world!";
let second_message = b"Greetings, planet!";

let net = instance_network::instance_network();

let sock = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap();
Expand All @@ -31,82 +17,11 @@ fn main() {
let sub = tcp::subscribe(sock);

tcp::start_bind(sock, net, addr).unwrap();
wait(sub);
tcp::finish_bind(sock).unwrap();

tcp::start_listen(sock).unwrap();
wait(sub);
tcp::finish_listen(sock).unwrap();

let addr = tcp::local_address(sock).unwrap();

let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap();
let client_sub = tcp::subscribe(client);

tcp::start_connect(client, net, addr).unwrap();
wait(client_sub);
let (client_input, client_output) = tcp::finish_connect(client).unwrap();

let (n, status) = streams::write(client_output, &[]).unwrap();
assert_eq!(n, 0);
assert_eq!(status, streams::StreamStatus::Open);

let (n, status) = streams::write(client_output, first_message).unwrap();
assert_eq!(n, first_message.len() as u64); // Not guaranteed to work but should work in practice.
assert_eq!(status, streams::StreamStatus::Open);

streams::drop_input_stream(client_input);
streams::drop_output_stream(client_output);
poll::drop_pollable(client_sub);
tcp::drop_tcp_socket(client);

wait(sub);
let (accepted, input, output) = tcp::accept(sock).unwrap();
wasi::poll::poll::drop_pollable(sub);

let (empty_data, status) = streams::read(input, 0).unwrap();
assert!(empty_data.is_empty());
assert_eq!(status, streams::StreamStatus::Open);

let (data, status) = streams::blocking_read(input, first_message.len() as u64).unwrap();
assert_eq!(status, streams::StreamStatus::Open);

tcp::drop_tcp_socket(accepted);
streams::drop_input_stream(input);
streams::drop_output_stream(output);

// Check that we sent and recieved our message!
assert_eq!(data, first_message); // Not guaranteed to work but should work in practice.

// Another client
let client = tcp_create_socket::create_tcp_socket(IpAddressFamily::Ipv4).unwrap();
let client_sub = tcp::subscribe(client);

tcp::start_connect(client, net, addr).unwrap();
wait(client_sub);
let (client_input, client_output) = tcp::finish_connect(client).unwrap();

let (n, status) = streams::write(client_output, second_message).unwrap();
assert_eq!(n, second_message.len() as u64); // Not guaranteed to work but should work in practice.
assert_eq!(status, streams::StreamStatus::Open);

streams::drop_input_stream(client_input);
streams::drop_output_stream(client_output);
poll::drop_pollable(client_sub);
tcp::drop_tcp_socket(client);

wait(sub);
let (accepted, input, output) = tcp::accept(sock).unwrap();
let (data, status) = streams::blocking_read(input, second_message.len() as u64).unwrap();
assert_eq!(status, streams::StreamStatus::Open);

streams::drop_input_stream(input);
streams::drop_output_stream(output);
tcp::drop_tcp_socket(accepted);

// Check that we sent and recieved our message!
assert_eq!(data, second_message); // Not guaranteed to work but should work in practice.
tcp::finish_bind(sock).unwrap();

poll::drop_pollable(sub);
tcp::drop_tcp_socket(sock);
network::drop_network(net);
example_body(net, sock, IpAddressFamily::Ipv4)
}
Loading

0 comments on commit 90a0a7a

Please sign in to comment.