diff --git a/tests-integration/tests/process_stdio.rs b/tests-integration/tests/process_stdio.rs index c01172e2067..89ac679d835 100644 --- a/tests-integration/tests/process_stdio.rs +++ b/tests-integration/tests/process_stdio.rs @@ -1,11 +1,13 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::join; use tokio::process::{Child, Command}; use tokio_test::assert_ok; use futures::future::{self, FutureExt}; +use std::convert::TryInto; use std::env; use std::io; use std::process::{ExitStatus, Stdio}; @@ -139,3 +141,52 @@ async fn try_wait() { // Can't get id after process has exited assert_eq!(child.id(), None); } + +#[tokio::test] +async fn pipe_from_one_command_to_another() { + let mut first = cat().spawn().expect("first cmd"); + let mut third = cat().spawn().expect("third cmd"); + + // Convert ChildStdout to Stdio + let second_stdin: Stdio = first + .stdout + .take() + .expect("first.stdout") + .try_into() + .expect("first.stdout into Stdio"); + + // Convert ChildStdin to Stdio + let second_stdout: Stdio = third + .stdin + .take() + .expect("third.stdin") + .try_into() + .expect("third.stdin into Stdio"); + + let mut second = cat() + .stdin(second_stdin) + .stdout(second_stdout) + .spawn() + .expect("first cmd"); + + let msg = "hello world! please pipe this message through"; + + let mut stdin = first.stdin.take().expect("first.stdin"); + let write = async move { stdin.write_all(msg.as_bytes()).await }; + + let mut stdout = third.stdout.take().expect("third.stdout"); + let read = async move { + let mut data = String::new(); + stdout.read_to_string(&mut data).await.map(|_| data) + }; + + let (read, write, first_status, second_status, third_status) = + join!(read, write, first.wait(), second.wait(), third.wait()); + + assert_eq!(msg, read.expect("read result")); + write.expect("write result"); + + assert!(first_status.expect("first status").success()); + assert!(second_status.expect("second status").success()); + assert!(third_status.expect("third status").success()); +} diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 0ecdb18066f..27a4cb7c1c2 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -126,7 +126,7 @@ impl PollEvented { } /// Deregister the inner io from the registration and returns a Result containing the inner io - #[cfg(feature = "net")] + #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn into_inner(mut self) -> io::Result { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. self.registration.deregister(&mut inner)?; diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index bed50a7aa63..320f59063ed 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -97,6 +97,51 @@ //! } //! ``` //! +//! With some coordination, we can also pipe the output of one command into +//! another. +//! +//! ```no_run +//! use tokio::join; +//! use tokio::process::Command; +//! use std::convert::TryInto; +//! use std::process::Stdio; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let mut echo = Command::new("echo") +//! .arg("hello world!") +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn echo"); +//! +//! let tr_stdin: Stdio = echo +//! .stdout +//! .take() +//! .unwrap() +//! .try_into() +//! .expect("failed to convert to Stdio"); +//! +//! let tr = Command::new("tr") +//! .arg("a-z") +//! .arg("A-Z") +//! .stdin(tr_stdin) +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn tr"); +//! +//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output()); +//! +//! assert!(echo_result.unwrap().success()); +//! +//! let tr_output = tr_output.expect("failed to await tr"); +//! assert!(tr_output.status.success()); +//! +//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n"); +//! +//! Ok(()) +//! } +//! ``` +//! //! # Caveats //! //! ## Dropping/Cancellation @@ -147,6 +192,7 @@ mod kill; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::process::kill::Kill; +use std::convert::TryInto; use std::ffi::OsStr; use std::future::Future; use std::io; @@ -1099,6 +1145,30 @@ impl AsyncRead for ChildStderr { } } +impl TryInto for ChildStdin { + type Error = io::Error; + + fn try_into(self) -> Result { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto for ChildStdout { + type Error = io::Error; + + fn try_into(self) -> Result { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto for ChildStderr { + type Error = io::Error; + + fn try_into(self) -> Result { + imp::convert_to_stdio(self.inner) + } +} + #[cfg(unix)] mod sys { use std::os::unix::io::{AsRawFd, RawFd}; diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 3608b9f1bc0..852a191b091 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -43,7 +43,7 @@ use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::pin::Pin; -use std::process::{Child as StdChild, ExitStatus}; +use std::process::{Child as StdChild, ExitStatus, Stdio}; use std::task::Context; use std::task::Poll; @@ -176,6 +176,18 @@ impl AsRawFd for Pipe { } } +pub(crate) fn convert_to_stdio(io: PollEvented) -> io::Result { + let mut fd = io.into_inner()?.fd; + + // Ensure that the fd to be inherited is set to *blocking* mode, as this + // is the default that virtually all programs expect to have. Those + // programs that know how to work with nonblocking stdio will know how to + // change it to nonblocking mode. + set_nonblocking(&mut fd, false)?; + + Ok(Stdio::from(fd)) +} + impl Source for Pipe { fn register( &mut self, @@ -204,6 +216,29 @@ pub(crate) type ChildStdin = PollEvented; pub(crate) type ChildStdout = PollEvented; pub(crate) type ChildStderr = PollEvented; +fn set_nonblocking(fd: &mut T, nonblocking: bool) -> io::Result<()> { + unsafe { + let fd = fd.as_raw_fd(); + let previous = libc::fcntl(fd, libc::F_GETFL); + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + + let r = libc::fcntl(fd, libc::F_SETFL, new); + if r == -1 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} + fn stdio(option: Option) -> io::Result>> where T: IntoRawFd, @@ -214,20 +249,8 @@ where }; // Set the fd to nonblocking before we pass it to the event loop - let pipe = unsafe { - let pipe = Pipe::from(io); - let fd = pipe.as_raw_fd(); - let r = libc::fcntl(fd, libc::F_GETFL); - if r == -1 { - return Err(io::Error::last_os_error()); - } - let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK); - if r == -1 { - return Err(io::Error::last_os_error()); - } - - pipe - }; + let mut pipe = Pipe::from(io); + set_nonblocking(&mut pipe, true)?; Ok(Some(PollEvented::new(pipe)?)) } diff --git a/tokio/src/process/windows.rs b/tokio/src/process/windows.rs index 1aa6c89043a..7237525da30 100644 --- a/tokio/src/process/windows.rs +++ b/tokio/src/process/windows.rs @@ -26,14 +26,19 @@ use std::future::Future; use std::io; use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle}; use std::pin::Pin; +use std::process::Stdio; use std::process::{Child as StdChild, Command as StdCommand, ExitStatus}; use std::ptr; use std::task::Context; use std::task::Poll; -use winapi::um::handleapi::INVALID_HANDLE_VALUE; +use winapi::shared::minwindef::{DWORD, FALSE}; +use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE}; +use winapi::um::processthreadsapi::GetCurrentProcess; use winapi::um::threadpoollegacyapiset::UnregisterWaitEx; use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE}; -use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE}; +use winapi::um::winnt::{ + BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, +}; #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { @@ -171,3 +176,30 @@ where let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) }; PollEvented::new(pipe).ok() } + +pub(crate) fn convert_to_stdio(io: PollEvented) -> io::Result { + let named_pipe = io.into_inner()?; + + // Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually + // duplicate the handle here... + unsafe { + let mut dup_handle = INVALID_HANDLE_VALUE; + let cur_proc = GetCurrentProcess(); + + let status = DuplicateHandle( + cur_proc, + named_pipe.as_raw_handle(), + cur_proc, + &mut dup_handle, + 0 as DWORD, + FALSE, + DUPLICATE_SAME_ACCESS, + ); + + if status == 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Stdio::from_raw_handle(dup_handle)) + } +}