From 23a3f15716118e0d1b96ab630fb5926582400597 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 14 Aug 2023 23:58:18 +0200 Subject: [PATCH] Update stdio on Unix to fall back to worker threads (#6833) Not all file descriptors can get registered with epoll, for example files on Linux or `/dev/null` on macOS return errors. In these situations the fallback of the worker thread implementation is used instead. --- crates/wasi/src/preview2/stdio/unix.rs | 77 ++++++++++++------- .../src/preview2/stdio/worker_thread_stdin.rs | 1 + 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index 9e24efc95f50..b624b874e8cf 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -1,3 +1,4 @@ +use super::worker_thread_stdin; use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState}; use anyhow::Error; use bytes::Bytes; @@ -16,26 +17,38 @@ use tokio::io::{AsyncRead, ReadBuf}; static STDIN: OnceLock = OnceLock::new(); #[derive(Clone)] -pub struct Stdin(Arc>); +pub enum Stdin { + // The process's standard input can be successfully registered with `epoll`, + // so it's tracked by a native async stream. + Async(Arc>), + + // The process's stdin can't be registered with epoll, for example it's a + // file on Linux or `/dev/null` on macOS. The fallback implementation of a + // worker thread is used in these situations. + Blocking(worker_thread_stdin::Stdin), +} pub fn stdin() -> Stdin { - fn init_stdin() -> AsyncReadStream { + fn init_stdin() -> anyhow::Result { use crate::preview2::RUNTIME; match tokio::runtime::Handle::try_current() { - Ok(_) => AsyncReadStream::new(InnerStdin::new().unwrap()), + Ok(_) => Ok(AsyncReadStream::new(InnerStdin::new()?)), Err(_) => { let _enter = RUNTIME.enter(); - RUNTIME.block_on(async { AsyncReadStream::new(InnerStdin::new().unwrap()) }) + RUNTIME.block_on(async { Ok(AsyncReadStream::new(InnerStdin::new()?)) }) } } } let handle = STDIN - .get_or_init(|| Stdin(Arc::new(Mutex::new(init_stdin())))) + .get_or_init(|| match init_stdin() { + Ok(stream) => Stdin::Async(Arc::new(Mutex::new(stream))), + Err(_) => Stdin::Blocking(worker_thread_stdin::stdin()), + }) .clone(); - { - let mut guard = handle.0.lock().unwrap(); + if let Stdin::Async(stream) = &handle { + let mut guard = stream.lock().unwrap(); // The backing task exited. This can happen in two cases: // @@ -45,7 +58,7 @@ pub fn stdin() -> Stdin { // As we can't tell the difference between these two, we assume the latter and restart the // task. if guard.join_handle.is_finished() { - *guard = init_stdin(); + *guard = init_stdin().unwrap(); } } @@ -55,31 +68,39 @@ pub fn stdin() -> Stdin { #[async_trait::async_trait] impl crate::preview2::HostInputStream for Stdin { fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - HostInputStream::read(&mut *self.0.lock().unwrap(), size) + match self { + Stdin::Async(s) => HostInputStream::read(&mut *s.lock().unwrap(), size), + Stdin::Blocking(s) => s.read(size), + } } async fn ready(&mut self) -> Result<(), Error> { - // Custom Future impl takes the std mutex in each invocation of poll. - // Required so we don't have to use a tokio mutex, which we can't take from - // inside a sync context in Self::read. - // - // Taking the lock, creating a fresh ready() future, polling it once, and - // then releasing the lock is acceptable here because the ready() future - // is only ever going to await on a single channel recv, plus some management - // of a state machine (for buffering). - struct Ready<'a> { - handle: &'a Stdin, - } - impl<'a> Future for Ready<'a> { - type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut locked = self.handle.0.lock().unwrap(); - let fut = locked.ready(); - tokio::pin!(fut); - fut.poll(cx) + match self { + Stdin::Async(handle) => { + // Custom Future impl takes the std mutex in each invocation of poll. + // Required so we don't have to use a tokio mutex, which we can't take from + // inside a sync context in Self::read. + // + // Taking the lock, creating a fresh ready() future, polling it once, and + // then releasing the lock is acceptable here because the ready() future + // is only ever going to await on a single channel recv, plus some management + // of a state machine (for buffering). + struct Ready<'a> { + handle: &'a Arc>, + } + impl<'a> Future for Ready<'a> { + type Output = Result<(), Error>; + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut locked = self.handle.lock().unwrap(); + let fut = locked.ready(); + tokio::pin!(fut); + fut.poll(cx) + } + } + Ready { handle }.await } + Stdin::Blocking(s) => s.ready().await, } - Ready { handle: self }.await } } diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index b4fde02465e3..116c747ca5d7 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -78,6 +78,7 @@ fn create() -> GlobalStdin { } /// Only public interface is the [`HostInputStream`] impl. +#[derive(Clone)] pub struct Stdin; impl Stdin { // Private! Only required internally.