Skip to content

Commit

Permalink
Fix a warning in wasi-http
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottt committed Aug 31, 2023
1 parent 2c8ddcc commit 30b9cd5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
1 change: 1 addition & 0 deletions crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl<T: WasiHttpView> WasiHttpViewExt for T {
let (stream_id, stream) = self
.table_mut()
.push_stream(Bytes::from(buf), response_id)
.await
.context("[handle_async] pushing stream")?;
let response = self
.table_mut()
Expand Down
28 changes: 19 additions & 9 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl Stream {
}
}

#[async_trait::async_trait]
pub trait TableHttpExt {
fn push_request(&mut self, request: Box<dyn HttpRequest>) -> Result<u32, TableError>;
fn get_request(&self, id: u32) -> Result<&(dyn HttpRequest), TableError>;
Expand All @@ -308,12 +309,13 @@ pub trait TableHttpExt {
fn get_fields_mut(&mut self, id: u32) -> Result<&mut Box<ActiveFields>, TableError>;
fn delete_fields(&mut self, id: u32) -> Result<(), TableError>;

fn push_stream(&mut self, content: Bytes, parent: u32) -> Result<(u32, Stream), TableError>;
async fn push_stream(&mut self, content: Bytes, parent: u32) -> Result<(u32, Stream), TableError>;
fn get_stream(&self, id: u32) -> Result<&Stream, TableError>;
fn get_stream_mut(&mut self, id: u32) -> Result<&mut Box<Stream>, TableError>;
fn delete_stream(&mut self, id: u32) -> Result<(), TableError>;
}

#[async_trait::async_trait]
impl TableHttpExt for Table {
fn push_request(&mut self, request: Box<dyn HttpRequest>) -> Result<u32, TableError> {
self.push(Box::new(request))
Expand Down Expand Up @@ -367,21 +369,29 @@ impl TableHttpExt for Table {
self.delete::<Box<ActiveFields>>(id).map(|_old| ())
}

fn push_stream(&mut self, content: Bytes, parent: u32) -> Result<(u32, Stream), TableError> {
async fn push_stream(
&mut self,
mut content: Bytes,
parent: u32,
) -> Result<(u32, Stream), TableError> {
let (a, b) = tokio::io::duplex(MAX_BUF_SIZE);
let (_, write_stream) = tokio::io::split(a);
let (read_stream, _) = tokio::io::split(b);
let input_stream = AsyncReadStream::new(read_stream);
// TODO: more informed budget here
let mut output_stream = AsyncWriteStream::new(4096, write_stream);

let mut cursor = 0;
while cursor < content.len() {
// let (written, _) = output_stream
// .write(content.slice(cursor..content.len()))
// .map_err(|_| TableError::NotPresent)?;
// cursor += written;
todo!()
while !content.is_empty() {
let permit = output_stream
.write_ready()
.await
.map_err(|_| TableError::NotPresent)?;

let chunk = content.split_to(permit as usize);

output_stream
.write(chunk)
.map_err(|_| TableError::NotPresent)?;
}

let input_stream = Box::new(input_stream);
Expand Down
7 changes: 5 additions & 2 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,13 @@ impl<T: WasiHttpView + WasiHttpViewExt> crate::wasi::http::types::Host for T {
.table()
.get_request(request)
.context("[outgoing_request_write] getting request")?;
let stream_id = req.body().unwrap_or_else(|| {
let stream_id = if let Some(stream_id) = req.body() {
stream_id
} else {
let (new, stream) = self
.table_mut()
.push_stream(Bytes::new(), request)
.await
.expect("[outgoing_request_write] valid output stream");
self.http_ctx_mut().streams.insert(new, stream);
let req = self
Expand All @@ -225,7 +228,7 @@ impl<T: WasiHttpView + WasiHttpViewExt> crate::wasi::http::types::Host for T {
.expect("[outgoing_request_write] request to be found");
req.set_body(new);
new
});
};
let stream = self
.table()
.get_stream(stream_id)
Expand Down

0 comments on commit 30b9cd5

Please sign in to comment.