From d9e3a444e918e7a3ab86f5302f491f8e31892f1d Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Sat, 26 Aug 2023 01:56:45 +0200 Subject: [PATCH] feat(wasi-http): expose sync api for components --- crates/wasi-http/src/component_impl.rs | 11 + crates/wasi-http/src/http_impl.rs | 52 ++++- crates/wasi-http/src/incoming_handler.rs | 18 ++ crates/wasi-http/src/lib.rs | 31 ++- crates/wasi-http/src/proxy.rs | 41 +++- crates/wasi-http/src/types_impl.rs | 286 +++++++++++++++++++++++ 6 files changed, 430 insertions(+), 9 deletions(-) diff --git a/crates/wasi-http/src/component_impl.rs b/crates/wasi-http/src/component_impl.rs index ef32849d964b..98d9ba67d5df 100644 --- a/crates/wasi-http/src/component_impl.rs +++ b/crates/wasi-http/src/component_impl.rs @@ -618,3 +618,14 @@ pub fn add_component_to_linker( )?; Ok(()) } + +pub mod sync { + use crate::WasiHttpView; + + pub fn add_component_to_linker( + linker: &mut wasmtime::Linker, + get_cx: impl Fn(&mut T) -> &mut T + Send + Sync + Copy + 'static, + ) -> anyhow::Result<()> { + unimplemented!("linker synchronous version") + } +} diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 767fed4f3a53..b1a354ed0e5d 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -32,6 +32,40 @@ impl crate::bindings::http::outgoing_handler::Host for T { } } +#[cfg(feature = "sync")] +pub mod sync { + use crate::bindings::http::outgoing_handler::{ + Host as AsyncHost, RequestOptions as AsyncRequestOptions, + }; + use crate::bindings::sync::http::types::{ + FutureIncomingResponse, OutgoingRequest, RequestOptions, + }; + use crate::WasiHttpView; + use wasmtime_wasi::preview2::in_tokio; + + // same boilerplate everywhere, converting between two identical types with different + // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary + impl From for AsyncRequestOptions { + fn from(other: RequestOptions) -> Self { + Self { + connect_timeout_ms: other.connect_timeout_ms, + first_byte_timeout_ms: other.first_byte_timeout_ms, + between_bytes_timeout_ms: other.between_bytes_timeout_ms, + } + } + } + + impl crate::bindings::sync::http::outgoing_handler::Host for T { + fn handle( + &mut self, + request_id: OutgoingRequest, + options: Option, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::handle(self, request_id, options.map(|v| v.into())).await }) + } + } +} + fn port_for_scheme(scheme: &Option) -> &str { match scheme { Some(s) => match s { @@ -60,6 +94,7 @@ impl WasiHttpViewExt for T { request_id: OutgoingRequest, options: Option, ) -> wasmtime::Result { + tracing::debug!("preparing outgoing request"); let opts = options.unwrap_or( // TODO: Configurable defaults here? RequestOptions { @@ -80,6 +115,7 @@ impl WasiHttpViewExt for T { .get_request(request_id) .context("[handle_async] getting request")? .clone(); + tracing::debug!("http request retrieved from table"); let method = match request.method() { crate::bindings::http::types::Method::Get => Method::GET, @@ -119,6 +155,7 @@ impl WasiHttpViewExt for T { }; let tcp_stream = TcpStream::connect(authority.clone()).await?; let mut sender = if scheme == "https://" { + tracing::debug!("initiating client connection client with TLS"); #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] { //TODO: uncomment this code and make the tls implementation a feature decision. @@ -158,7 +195,7 @@ impl WasiHttpViewExt for T { let (s, conn) = t?; tokio::task::spawn(async move { if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); + println!("[host/client] Connection failed: {:?}", err); } }); s @@ -168,6 +205,7 @@ impl WasiHttpViewExt for T { "unsupported architecture for SSL".to_string(), )); } else { + tracing::debug!("initiating client connection without TLS"); let t = timeout( connect_timeout, hyper::client::conn::http1::handshake(tcp_stream), @@ -176,7 +214,7 @@ impl WasiHttpViewExt for T { let (s, conn) = t?; tokio::task::spawn(async move { if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); + println!("[host/client] Connection failed: {:?}", err); } }); s @@ -184,6 +222,7 @@ impl WasiHttpViewExt for T { let url = scheme.to_owned() + &request.authority() + &request.path_with_query(); + tracing::debug!("request to url {:?}", &url); let mut call = Request::builder() .method(method) .uri(url) @@ -227,8 +266,11 @@ impl WasiHttpViewExt for T { } None => Empty::::new().boxed(), }; - let t = timeout(first_bytes_timeout, sender.send_request(call.body(body)?)).await?; + let request = call.body(body)?; + tracing::trace!("hyper request {:?}", request); + let t = timeout(first_bytes_timeout, sender.send_request(request)).await?; let mut res = t?; + tracing::trace!("hyper response {:?}", res); response.status = res.status().as_u16(); let mut map = ActiveFields::new(); @@ -246,10 +288,12 @@ impl WasiHttpViewExt for T { let mut buf: Vec = Vec::new(); while let Some(next) = timeout(between_bytes_timeout, res.frame()).await? { let frame = next?; + tracing::debug!("response body next frame"); if let Some(chunk) = frame.data_ref() { buf.extend_from_slice(chunk); } if let Some(trailers) = frame.trailers_ref() { + tracing::debug!("response trailers present"); let mut map = ActiveFields::new(); for (name, value) in trailers.iter() { let key = name.to_string(); @@ -267,6 +311,7 @@ impl WasiHttpViewExt for T { .push_fields(Box::new(map)) .context("[handle_async] pushing response trailers")?; response.set_trailers(trailers); + tracing::debug!("http trailers saved to table"); } } @@ -283,6 +328,7 @@ impl WasiHttpViewExt for T { .get_response_mut(response_id) .context("[handle_async] getting mutable response")?; response.set_body(stream_id); + tracing::debug!("http response saved to table with id {:?}", response_id); self.http_ctx_mut().streams.insert(stream_id, stream); diff --git a/crates/wasi-http/src/incoming_handler.rs b/crates/wasi-http/src/incoming_handler.rs index c40ec54f6a37..e65a88e27d53 100644 --- a/crates/wasi-http/src/incoming_handler.rs +++ b/crates/wasi-http/src/incoming_handler.rs @@ -11,3 +11,21 @@ impl crate::bindings::http::incoming_handler::Host for T { anyhow::bail!("unimplemented: [incoming_handler] handle") } } + +#[cfg(feature = "sync")] +pub mod sync { + use crate::bindings::http::incoming_handler::Host as AsyncHost; + use crate::bindings::sync::http::types::{IncomingRequest, ResponseOutparam}; + use crate::WasiHttpView; + use wasmtime_wasi::preview2::in_tokio; + + impl crate::bindings::sync::http::incoming_handler::Host for T { + fn handle( + &mut self, + request: IncomingRequest, + response_out: ResponseOutparam, + ) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::handle(self, request, response_out).await }) + } + } +} diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index 68471de1ad60..28f280fd7ed4 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -1,4 +1,3 @@ -use crate::component_impl::add_component_to_linker; pub use crate::http_impl::WasiHttpViewExt; pub use crate::types::{WasiHttpCtx, WasiHttpView}; use core::fmt::Formatter; @@ -12,6 +11,26 @@ pub mod types; pub mod types_impl; pub mod bindings { + #[cfg(feature = "sync")] + pub mod sync { + pub(crate) mod _internal { + wasmtime::component::bindgen!({ + path: "wit", + interfaces: " + import wasi:http/incoming-handler + import wasi:http/outgoing-handler + import wasi:http/types + ", + tracing: true, + with: { + "wasi:io/streams": wasmtime_wasi::preview2::bindings::sync_io::io::streams, + "wasi:poll/poll": wasmtime_wasi::preview2::bindings::sync_io::poll::poll, + } + }); + } + pub use self::_internal::wasi::http; + } + pub(crate) mod _internal_rest { wasmtime::component::bindgen!({ path: "wit", @@ -33,7 +52,15 @@ pub mod bindings { } pub fn add_to_linker(linker: &mut wasmtime::Linker) -> anyhow::Result<()> { - add_component_to_linker::(linker, |t| t) + crate::component_impl::add_component_to_linker::(linker, |t| t) +} + +pub mod sync { + use crate::r#struct::WasiHttpView; + + pub fn add_to_linker(linker: &mut wasmtime::Linker) -> anyhow::Result<()> { + crate::component_impl::sync::add_component_to_linker::(linker, |t| t) + } } impl std::error::Error for crate::bindings::http::types::Error {} diff --git a/crates/wasi-http/src/proxy.rs b/crates/wasi-http/src/proxy.rs index 5151166ede91..8cd78d700265 100644 --- a/crates/wasi-http/src/proxy.rs +++ b/crates/wasi-http/src/proxy.rs @@ -23,13 +23,46 @@ wasmtime::component::bindgen!({ pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> anyhow::Result<()> where - T: WasiHttpView - + bindings::http::incoming_handler::Host - + bindings::http::outgoing_handler::Host - + bindings::http::types::Host, + T: WasiHttpView + bindings::http::types::Host, { bindings::http::incoming_handler::add_to_linker(l, |t| t)?; bindings::http::outgoing_handler::add_to_linker(l, |t| t)?; bindings::http::types::add_to_linker(l, |t| t)?; Ok(()) } + +#[cfg(feature = "sync")] +pub mod sync { + use crate::{bindings, WasiHttpView}; + use wasmtime_wasi::preview2; + + wasmtime::component::bindgen!({ + world: "wasi:http/proxy", + tracing: true, + async: false, + with: { + "wasi:cli/stderr": preview2::bindings::cli::stderr, + "wasi:cli/stdin": preview2::bindings::cli::stdin, + "wasi:cli/stdout": preview2::bindings::cli::stdout, + "wasi:clocks/monotonic-clock": preview2::bindings::clocks::monotonic_clock, + "wasi:clocks/timezone": preview2::bindings::clocks::timezone, + "wasi:clocks/wall-clock": preview2::bindings::clocks::wall_clock, + "wasi:http/incoming-handler": bindings::sync::http::incoming_handler, + "wasi:http/outgoing-handler": bindings::sync::http::outgoing_handler, + "wasi:http/types": bindings::sync::http::types, + "wasi:io/streams": preview2::bindings::sync_io::io::streams, + "wasi:poll/poll": preview2::bindings::sync_io::poll::poll, + "wasi:random/random": preview2::bindings::random::random, + }, + }); + + pub fn add_to_linker(l: &mut wasmtime::component::Linker) -> anyhow::Result<()> + where + T: WasiHttpView + bindings::sync::http::types::Host, + { + bindings::sync::http::incoming_handler::add_to_linker(l, |t| t)?; + bindings::sync::http::outgoing_handler::add_to_linker(l, |t| t)?; + bindings::sync::http::types::add_to_linker(l, |t| t)?; + Ok(()) + } +} diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 22df4b21dd9b..12ef09553661 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -399,3 +399,289 @@ impl crate::bindings::http::types::Host for T }) } } + +#[cfg(feature = "sync")] +pub mod sync { + use crate::bindings::http::types::{ + Error as AsyncError, Host as AsyncHost, Method as AsyncMethod, Scheme as AsyncScheme, + }; + use crate::bindings::sync::http::types::{ + Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse, + IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, + ResponseOutparam, Scheme, StatusCode, Trailers, + }; + use crate::http_impl::WasiHttpViewExt; + use crate::WasiHttpView; + use wasmtime_wasi::preview2::{bindings::poll::poll::Pollable, in_tokio}; + + // same boilerplate everywhere, converting between two identical types with different + // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary + impl From for Error { + fn from(other: AsyncError) -> Self { + match other { + AsyncError::InvalidUrl(v) => Self::InvalidUrl(v), + AsyncError::ProtocolError(v) => Self::ProtocolError(v), + AsyncError::TimeoutError(v) => Self::TimeoutError(v), + AsyncError::UnexpectedError(v) => Self::UnexpectedError(v), + } + } + } + + impl From for AsyncError { + fn from(other: Error) -> Self { + match other { + Error::InvalidUrl(v) => Self::InvalidUrl(v), + Error::ProtocolError(v) => Self::ProtocolError(v), + Error::TimeoutError(v) => Self::TimeoutError(v), + Error::UnexpectedError(v) => Self::UnexpectedError(v), + } + } + } + + impl From for Method { + fn from(other: AsyncMethod) -> Self { + match other { + AsyncMethod::Connect => Self::Connect, + AsyncMethod::Delete => Self::Delete, + AsyncMethod::Get => Self::Get, + AsyncMethod::Head => Self::Head, + AsyncMethod::Options => Self::Options, + AsyncMethod::Patch => Self::Patch, + AsyncMethod::Post => Self::Post, + AsyncMethod::Put => Self::Put, + AsyncMethod::Trace => Self::Trace, + AsyncMethod::Other(v) => Self::Other(v), + } + } + } + + impl From for AsyncMethod { + fn from(other: Method) -> Self { + match other { + Method::Connect => Self::Connect, + Method::Delete => Self::Delete, + Method::Get => Self::Get, + Method::Head => Self::Head, + Method::Options => Self::Options, + Method::Patch => Self::Patch, + Method::Post => Self::Post, + Method::Put => Self::Put, + Method::Trace => Self::Trace, + Method::Other(v) => Self::Other(v), + } + } + } + + impl From for Scheme { + fn from(other: AsyncScheme) -> Self { + match other { + AsyncScheme::Http => Self::Http, + AsyncScheme::Https => Self::Https, + AsyncScheme::Other(v) => Self::Other(v), + } + } + } + + impl From for AsyncScheme { + fn from(other: Scheme) -> Self { + match other { + Scheme::Http => Self::Http, + Scheme::Https => Self::Https, + Scheme::Other(v) => Self::Other(v), + } + } + } + + impl crate::bindings::sync::http::types::Host for T { + fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_fields(self, fields).await }) + } + fn new_fields(&mut self, entries: Vec<(String, String)>) -> wasmtime::Result { + in_tokio(async { AsyncHost::new_fields(self, entries).await }) + } + fn fields_get(&mut self, fields: Fields, name: String) -> wasmtime::Result>> { + in_tokio(async { AsyncHost::fields_get(self, fields, name).await }) + } + fn fields_set( + &mut self, + fields: Fields, + name: String, + value: Vec>, + ) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::fields_set(self, fields, name, value).await }) + } + fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::fields_delete(self, fields, name).await }) + } + fn fields_append( + &mut self, + fields: Fields, + name: String, + value: Vec, + ) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::fields_append(self, fields, name, value).await }) + } + fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result)>> { + in_tokio(async { AsyncHost::fields_entries(self, fields).await }) + } + fn fields_clone(&mut self, fields: Fields) -> wasmtime::Result { + in_tokio(async { AsyncHost::fields_clone(self, fields).await }) + } + fn finish_incoming_stream( + &mut self, + stream_id: IncomingStream, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::finish_incoming_stream(self, stream_id).await }) + } + fn finish_outgoing_stream( + &mut self, + stream: OutgoingStream, + trailers: Option, + ) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::finish_outgoing_stream(self, stream, trailers).await }) + } + fn drop_incoming_request(&mut self, request: IncomingRequest) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_incoming_request(self, request).await }) + } + fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_outgoing_request(self, request).await }) + } + fn incoming_request_method( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::incoming_request_method(self, request).await }) + .map(Method::from) + } + fn incoming_request_path_with_query( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::incoming_request_path_with_query(self, request).await }) + } + fn incoming_request_scheme( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result> { + Ok( + in_tokio(async { AsyncHost::incoming_request_scheme(self, request).await })? + .map(Scheme::from), + ) + } + fn incoming_request_authority( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::incoming_request_authority(self, request).await }) + } + fn incoming_request_headers( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::incoming_request_headers(self, request).await }) + } + fn incoming_request_consume( + &mut self, + request: IncomingRequest, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::incoming_request_consume(self, request).await }) + } + fn new_outgoing_request( + &mut self, + method: Method, + path_with_query: Option, + scheme: Option, + authority: Option, + headers: Headers, + ) -> wasmtime::Result { + in_tokio(async { + AsyncHost::new_outgoing_request( + self, + method.into(), + path_with_query, + scheme.map(AsyncScheme::from), + authority, + headers, + ) + .await + }) + } + fn outgoing_request_write( + &mut self, + request: OutgoingRequest, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::outgoing_request_write(self, request).await }) + } + fn drop_response_outparam(&mut self, response: ResponseOutparam) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_response_outparam(self, response).await }) + } + fn set_response_outparam( + &mut self, + outparam: ResponseOutparam, + response: Result, + ) -> wasmtime::Result> { + in_tokio(async { + AsyncHost::set_response_outparam(self, outparam, response.map_err(AsyncError::from)) + .await + }) + } + fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_incoming_response(self, response).await }) + } + fn drop_outgoing_response(&mut self, response: OutgoingResponse) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_outgoing_response(self, response).await }) + } + fn incoming_response_status( + &mut self, + response: IncomingResponse, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::incoming_response_status(self, response).await }) + } + fn incoming_response_headers( + &mut self, + response: IncomingResponse, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::incoming_response_headers(self, response).await }) + } + fn incoming_response_consume( + &mut self, + response: IncomingResponse, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::incoming_response_consume(self, response).await }) + } + fn new_outgoing_response( + &mut self, + status_code: StatusCode, + headers: Headers, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::new_outgoing_response(self, status_code, headers).await }) + } + fn outgoing_response_write( + &mut self, + response: OutgoingResponse, + ) -> wasmtime::Result> { + in_tokio(async { AsyncHost::outgoing_response_write(self, response).await }) + } + fn drop_future_incoming_response( + &mut self, + future: FutureIncomingResponse, + ) -> wasmtime::Result<()> { + in_tokio(async { AsyncHost::drop_future_incoming_response(self, future).await }) + } + fn future_incoming_response_get( + &mut self, + future: FutureIncomingResponse, + ) -> wasmtime::Result>> { + Ok( + in_tokio(async { AsyncHost::future_incoming_response_get(self, future).await })? + .map(|v| v.map_err(Error::from)), + ) + } + fn listen_to_future_incoming_response( + &mut self, + future: FutureIncomingResponse, + ) -> wasmtime::Result { + in_tokio(async { AsyncHost::listen_to_future_incoming_response(self, future).await }) + } + } +}