From 0a930582e0e8216c9b6a5658da12dba152d43f44 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Wed, 23 Aug 2023 20:29:02 -0700 Subject: [PATCH] file output stream impl of HostOutputStream --- crates/wasi/src/preview2/filesystem.rs | 100 +++++++-- crates/wasi/src/preview2/host/filesystem.rs | 18 +- crates/wasi/src/preview2/host/io.rs | 224 +++++++------------- crates/wasi/src/preview2/stream.rs | 59 +----- 4 files changed, 170 insertions(+), 231 deletions(-) diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index c9224ce33fcf..647b57b82c2f 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -1,4 +1,8 @@ -use crate::preview2::{StreamRuntimeError, StreamState, Table, TableError}; +use crate::preview2::{ + FlushResult, HostOutputStream, StreamRuntimeError, StreamState, Table, TableError, + WriteReadiness, +}; +use anyhow::{anyhow, Context}; use bytes::{Bytes, BytesMut}; use std::sync::Arc; @@ -161,14 +165,6 @@ fn read_result(r: Result) -> Result<(usize, StreamState), } } -fn write_result(r: Result) -> Result<(usize, StreamState), anyhow::Error> { - match r { - Ok(0) => Ok((0, StreamState::Closed)), - Ok(n) => Ok((n, StreamState::Open)), - Err(e) => Err(StreamRuntimeError::from(anyhow::anyhow!(e)).into()), - } -} - #[derive(Clone, Copy)] pub(crate) enum FileOutputMode { Position(u64), @@ -178,35 +174,97 @@ pub(crate) enum FileOutputMode { pub(crate) struct FileOutputStream { file: Arc, mode: FileOutputMode, + task: Option>>, + closed: bool, } impl FileOutputStream { pub fn write_at(file: Arc, position: u64) -> Self { Self { file, mode: FileOutputMode::Position(position), + task: None, + closed: false, } } pub fn append(file: Arc) -> Self { Self { file, mode: FileOutputMode::Append, + task: None, + closed: false, } } - /// Write bytes. On success, returns the number of bytes written. - pub async fn write(&mut self, buf: Bytes) -> anyhow::Result<(usize, StreamState)> { +} + +#[async_trait::async_trait] +impl HostOutputStream for FileOutputStream { + fn write(&mut self, buf: Bytes) -> anyhow::Result> { use system_interface::fs::FileIoExt; + + if self.closed { + return Ok(Some(WriteReadiness::Closed)); + } + if self.task.is_some() { + // a write is pending - this call was not permitted + return Err(anyhow!( + "write not permitted: FileOutputStream write pending" + )); + } let f = Arc::clone(&self.file); let m = self.mode; - let r = tokio::task::spawn_blocking(move || match m { - FileOutputMode::Position(p) => f.write_at(buf.as_ref(), p), - FileOutputMode::Append => f.append(buf.as_ref()), - }) - .await - .unwrap(); - let (n, state) = write_result(r)?; - if let FileOutputMode::Position(ref mut position) = self.mode { - *position += n as u64; + self.task = Some(tokio::task::spawn_blocking(move || match m { + FileOutputMode::Position(p) => { + let _ = f.write_at(buf.as_ref(), p)?; // FIXME: make sure writes all + Ok(()) + } + FileOutputMode::Append => { + let _ = f.append(buf.as_ref())?; // FIXME: make sure writes all + Ok(()) + } + })); + Ok(None) + } + fn flush(&mut self) -> anyhow::Result> { + if self.closed { + return Ok(Some(FlushResult::Closed)); + } + if self.task.is_none() { + return Ok(Some(FlushResult::Done)); + } + Ok(None) + } + async fn write_ready(&mut self) -> anyhow::Result { + if self.closed { + return Ok(WriteReadiness::Closed); + } + if let Some(t) = self.task.take() { + match t.await.context("join of FileOutputStream worker task")? { + Ok(()) => Ok(WriteReadiness::Ready(64 * 1024)), + Err(e) => { + tracing::debug!("FileOutputStream closed with {e:?}"); + self.closed = true; + Ok(WriteReadiness::Closed) + } + } + } else { + Ok(WriteReadiness::Ready(64 * 1024)) + } + } + async fn flush_ready(&mut self) -> anyhow::Result { + if self.closed { + return Ok(FlushResult::Closed); + } + if let Some(t) = self.task.take() { + match t.await.context("join of FileOutputStream worker task")? { + Ok(()) => Ok(FlushResult::Done), + Err(e) => { + tracing::debug!("FileOutputStream closed with {e:?}"); + self.closed = true; + Ok(FlushResult::Closed) + } + } + } else { + Ok(FlushResult::Done) } - Ok((n, state)) } } diff --git a/crates/wasi/src/preview2/host/filesystem.rs b/crates/wasi/src/preview2/host/filesystem.rs index 189f5ca34aea..d286c7bf5e31 100644 --- a/crates/wasi/src/preview2/host/filesystem.rs +++ b/crates/wasi/src/preview2/host/filesystem.rs @@ -777,10 +777,7 @@ impl types::Host for T { fd: types::Descriptor, offset: types::Filesize, ) -> Result { - use crate::preview2::{ - filesystem::FileOutputStream, - stream::{InternalOutputStream, InternalTableStreamExt}, - }; + use crate::preview2::{filesystem::FileOutputStream, TableStreamExt}; // Trap if fd lookup fails: let f = self.table().get_file(fd)?; @@ -796,9 +793,7 @@ impl types::Host for T { let writer = FileOutputStream::write_at(clone, offset); // Insert the stream view into the table. Trap if the table is full. - let index = self - .table_mut() - .push_internal_output_stream(InternalOutputStream::File(writer))?; + let index = self.table_mut().push_output_stream(Box::new(writer))?; Ok(index) } @@ -807,10 +802,7 @@ impl types::Host for T { &mut self, fd: types::Descriptor, ) -> Result { - use crate::preview2::{ - filesystem::FileOutputStream, - stream::{InternalOutputStream, InternalTableStreamExt}, - }; + use crate::preview2::{filesystem::FileOutputStream, TableStreamExt}; // Trap if fd lookup fails: let f = self.table().get_file(fd)?; @@ -825,9 +817,7 @@ impl types::Host for T { let appender = FileOutputStream::append(clone); // Insert the stream view into the table. Trap if the table is full. - let index = self - .table_mut() - .push_internal_output_stream(InternalOutputStream::File(appender))?; + let index = self.table_mut().push_output_stream(Box::new(appender))?; Ok(index) } diff --git a/crates/wasi/src/preview2/host/io.rs b/crates/wasi/src/preview2/host/io.rs index b90ea9578501..06a96d17dc0c 100644 --- a/crates/wasi/src/preview2/host/io.rs +++ b/crates/wasi/src/preview2/host/io.rs @@ -1,11 +1,11 @@ use crate::preview2::{ bindings::io::streams::{self, InputStream, OutputStream}, bindings::poll::poll::Pollable, - filesystem::{FileInputStream, FileOutputStream}, + filesystem::FileInputStream, poll::PollableFuture, stream::{ - FlushResult, HostInputStream, HostOutputStream, InternalInputStream, InternalOutputStream, - InternalTableStreamExt, StreamRuntimeError, StreamState, WriteReadiness, + FlushResult, HostInputStream, HostOutputStream, InternalInputStream, + InternalTableStreamExt, StreamRuntimeError, StreamState, TableStreamExt, WriteReadiness, }, HostPollable, TablePollableExt, WasiView, }; @@ -46,7 +46,7 @@ impl streams::Host for T { } async fn drop_output_stream(&mut self, stream: OutputStream) -> anyhow::Result<()> { - self.table_mut().delete_internal_output_stream(stream)?; + self.table_mut().delete_output_stream(stream)?; Ok(()) } @@ -245,19 +245,12 @@ impl streams::Host for T { &mut self, stream: OutputStream, ) -> anyhow::Result> { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => { - match futures::future::poll_immediate(HostOutputStream::write_ready(s.as_mut())) - .await - { - Some(Ok(readiness)) => Ok(Some(readiness.into())), - Some(Err(e)) => Err(e), - None => Ok(None), - } - } - // FIXME: we need to bound this by the size of the file, if its not append. we can pick - // a default size for this in wasi ctx and allow the user to override it. - InternalOutputStream::File(_) => Ok(Some(streams::WriteReadiness::Ready(32 * 1024))), + let s = self.table_mut().get_output_stream_mut(stream)?; + // FIXME StreamRuntimeError to close stream + match futures::future::poll_immediate(s.write_ready()).await { + Some(Ok(readiness)) => Ok(Some(readiness.into())), + Some(Err(e)) => Err(e), + None => Ok(None), } } async fn write( @@ -265,83 +258,51 @@ impl streams::Host for T { stream: OutputStream, bytes: Vec, ) -> anyhow::Result> { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => { - match HostOutputStream::write(s.as_mut(), bytes.into()) { - Ok(Some(readiness)) => Ok(Some(readiness.into())), - Ok(None) => Ok(None), - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Some(streams::WriteReadiness::Closed)); - } else { - return Err(e); - } - } + let s = self.table_mut().get_output_stream_mut(stream)?; + match HostOutputStream::write(s, bytes.into()) { + Ok(Some(readiness)) => Ok(Some(readiness.into())), + Ok(None) => Ok(None), + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Some(streams::WriteReadiness::Closed)); + } else { + return Err(e); } } - // FIXME: change FileOutputStream::write to align with these new semantics - InternalOutputStream::File(s) => match FileOutputStream::write(s, bytes.into()).await { - Ok((_, StreamState::Open)) => Ok(Some(streams::WriteReadiness::Ready(32 * 1024))), - Ok((0, StreamState::Closed)) => Ok(Some(streams::WriteReadiness::Closed)), - Ok((_, StreamState::Closed)) => { - todo!("idk how to represent this case of partial success with the current wit") - } - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - Ok(Some(streams::WriteReadiness::Closed)) - } else { - Err(e) - } - } - }, } } async fn subscribe_to_write_ready(&mut self, stream: OutputStream) -> anyhow::Result { // Ensure that table element is an output-stream: - let pollable = match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(_) => { - fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { - let stream = stream - .downcast_mut::() - .expect("downcast to HostOutputStream failed"); - match *stream { - InternalOutputStream::Host(ref mut hs) => Box::pin(async move { - let _ = hs.write_ready().await?; - Ok(()) - }), - _ => unreachable!(), - } - } + let _ = self.table_mut().get_output_stream_mut(stream)?; - HostPollable::TableEntry { - index: stream, - make_future: output_stream_ready, - } - } - InternalOutputStream::File(_) => { - HostPollable::Closure(Box::new(|| Box::pin(futures::future::ready(Ok(()))))) - } - }; + fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { + let stream = stream + .downcast_mut::>() + .expect("downcast to HostOutputStream failed"); + Box::pin(async move { + let _ = stream.write_ready().await?; + Ok(()) + }) + } - Ok(self.table_mut().push_host_pollable(pollable)?) + Ok(self + .table_mut() + .push_host_pollable(HostPollable::TableEntry { + index: stream, + make_future: output_stream_ready, + })?) } async fn blocking_check_write( &mut self, stream: OutputStream, ) -> anyhow::Result { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(h) => { - // await until actually ready for writing: - let _ = h.write_ready().await?; - } - _ => {} - } - let check = self.check_write(stream).await?; - Ok(check.expect("check_write is ready: write_ready future completed")) + let s = self.table_mut().get_output_stream_mut(stream)?; + // await until actually ready for writing: + // FIXME StreamRuntimeError to close stream + Ok(s.write_ready().await?.into()) } async fn write_zeroes( @@ -349,22 +310,18 @@ impl streams::Host for T { stream: OutputStream, len: u64, ) -> anyhow::Result> { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => { - match HostOutputStream::write_zeroes(s.as_mut(), len as usize) { - Ok(Some(readiness)) => Ok(Some(readiness.into())), - Ok(None) => Ok(None), - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Some(streams::WriteReadiness::Closed)); - } else { - return Err(e); - } - } + let s = self.table_mut().get_output_stream_mut(stream)?; + match HostOutputStream::write_zeroes(s, len as usize) { + Ok(Some(readiness)) => Ok(Some(readiness.into())), + Ok(None) => Ok(None), + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Some(streams::WriteReadiness::Closed)); + } else { + return Err(e); } } - InternalOutputStream::File { .. } => todo!("write_zeroes unimplemented for files"), } } @@ -372,72 +329,55 @@ impl streams::Host for T { &mut self, stream: OutputStream, ) -> anyhow::Result> { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => match HostOutputStream::flush(s.as_mut())? { - Some(result) => Ok(Some(result.into())), - None => Ok(None), - }, - _ => todo!("flush unimplemented for files"), + let s = self.table_mut().get_output_stream_mut(stream)?; + // FIXME StreamRuntimeError to close stream + match HostOutputStream::flush(s)? { + Some(result) => Ok(Some(result.into())), + None => Ok(None), } } async fn check_flush( &mut self, stream: OutputStream, ) -> anyhow::Result> { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => { - match futures::future::poll_immediate(HostOutputStream::flush_ready(s.as_mut())) - .await - { - Some(Ok(result)) => Ok(Some(result.into())), - Some(Err(e)) => Err(e), - None => Ok(None), - } - } - // FIXME: FileOutputStream needs to impl flush. - InternalOutputStream::File(_) => Ok(Some(streams::FlushResult::Done)), + let s = self.table_mut().get_output_stream_mut(stream)?; + // FIXME StreamRuntimeError to close stream + match futures::future::poll_immediate(s.flush_ready()).await { + Some(Ok(result)) => Ok(Some(result.into())), + Some(Err(e)) => Err(e), + None => Ok(None), } } async fn blocking_flush( &mut self, stream: OutputStream, ) -> anyhow::Result { - match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(s) => { - let _ = HostOutputStream::flush(s.as_mut())?; - Ok(HostOutputStream::flush_ready(s.as_mut()).await?.into()) - } - _ => todo!("blocking_flush unimplemented for files"), - } + let s = self.table_mut().get_output_stream_mut(stream)?; + // FIXME StreamRuntimeError to close stream + HostOutputStream::flush(s)?; + Ok(HostOutputStream::flush_ready(s).await?.into()) } async fn subscribe_to_flush(&mut self, stream: OutputStream) -> anyhow::Result { // Ensure that table element is an output-stream: - let pollable = match self.table_mut().get_internal_output_stream_mut(stream)? { - InternalOutputStream::Host(_) => { - fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { - let stream = stream - .downcast_mut::() - .expect("downcast to HostOutputStream failed"); - match *stream { - InternalOutputStream::Host(ref mut hs) => Box::pin(async move { - let _ = hs.flush_ready().await?; - Ok(()) - }), - _ => unreachable!(), - } - } + let _ = self.table_mut().get_output_stream_mut(stream)?; - HostPollable::TableEntry { - index: stream, - make_future: output_stream_ready, - } - } - InternalOutputStream::File(_) => { - HostPollable::Closure(Box::new(|| Box::pin(futures::future::ready(Ok(()))))) - } - }; - Ok(self.table_mut().push_host_pollable(pollable)?) + fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { + let stream = stream + .downcast_mut::>() + .expect("downcast to HostOutputStream failed"); + Box::pin(async move { + let _ = stream.flush_ready().await?; + Ok(()) + }) + } + + Ok(self + .table_mut() + .push_host_pollable(HostPollable::TableEntry { + index: stream, + make_future: output_stream_ready, + })?) } /* -------------------------------------------------------------- * diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index c2cd0c58197a..8a3bac113a3c 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -1,4 +1,4 @@ -use crate::preview2::filesystem::{FileInputStream, FileOutputStream}; +use crate::preview2::filesystem::FileInputStream; use crate::preview2::{Table, TableError}; use anyhow::Error; use bytes::Bytes; @@ -126,11 +126,6 @@ pub(crate) enum InternalInputStream { File(FileInputStream), } -pub(crate) enum InternalOutputStream { - Host(Box), - File(FileOutputStream), -} - pub(crate) trait InternalTableStreamExt { fn push_internal_input_stream( &mut self, @@ -141,19 +136,6 @@ pub(crate) trait InternalTableStreamExt { fd: u32, ) -> Result<&mut InternalInputStream, TableError>; fn delete_internal_input_stream(&mut self, fd: u32) -> Result; - - fn push_internal_output_stream( - &mut self, - ostream: InternalOutputStream, - ) -> Result; - fn get_internal_output_stream_mut( - &mut self, - fd: u32, - ) -> Result<&mut InternalOutputStream, TableError>; - fn delete_internal_output_stream( - &mut self, - fd: u32, - ) -> Result; } impl InternalTableStreamExt for Table { fn push_internal_input_stream( @@ -171,25 +153,6 @@ impl InternalTableStreamExt for Table { fn delete_internal_input_stream(&mut self, fd: u32) -> Result { self.delete(fd) } - - fn push_internal_output_stream( - &mut self, - ostream: InternalOutputStream, - ) -> Result { - self.push(Box::new(ostream)) - } - fn get_internal_output_stream_mut( - &mut self, - fd: u32, - ) -> Result<&mut InternalOutputStream, TableError> { - self.get_mut(fd) - } - fn delete_internal_output_stream( - &mut self, - fd: u32, - ) -> Result { - self.delete(fd) - } } /// Extension trait for managing [`HostInputStream`]s and [`HostOutputStream`]s in the [`Table`]. @@ -238,26 +201,14 @@ impl TableStreamExt for Table { &mut self, ostream: Box, ) -> Result { - self.push_internal_output_stream(InternalOutputStream::Host(ostream)) + self.push(Box::new(ostream)) } fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut dyn HostOutputStream, TableError> { - match self.get_internal_output_stream_mut(fd)? { - InternalOutputStream::Host(ref mut h) => Ok(h.as_mut()), - _ => Err(TableError::WrongType), - } + let boxed: &mut Box = self.get_mut(fd)?; + Ok(boxed.as_mut()) } fn delete_output_stream(&mut self, fd: u32) -> Result, TableError> { - let occ = self.entry(fd)?; - match occ.get().downcast_ref::() { - Some(InternalOutputStream::Host(_)) => { - let any = occ.remove_entry()?; - match *any.downcast().expect("downcast checked above") { - InternalOutputStream::Host(h) => Ok(h), - _ => unreachable!("variant checked above"), - } - } - _ => Err(TableError::WrongType), - } + self.delete(fd) } }