Skip to content

Commit

Permalink
process: allow for ergonomically piping stdio of one child into anoth…
Browse files Browse the repository at this point in the history
…er (#3466)

* Add `TryInto<Stdio>` impls for `ChildStd{in,out,err}` for ergonomic
  conversions into `std::process::Stdio` so callers can perform the
  conversion without needing to manipulate raw fds/handles directly
  • Loading branch information
ipetkov authored Jan 27, 2021
1 parent f13a9dd commit 34f1d3d
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 19 deletions.
53 changes: 52 additions & 1 deletion tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::join;
use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use std::convert::TryInto;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
Expand Down Expand Up @@ -139,3 +141,52 @@ async fn try_wait() {
// Can't get id after process has exited
assert_eq!(child.id(), None);
}

#[tokio::test]
async fn pipe_from_one_command_to_another() {
let mut first = cat().spawn().expect("first cmd");
let mut third = cat().spawn().expect("third cmd");

// Convert ChildStdout to Stdio
let second_stdin: Stdio = first
.stdout
.take()
.expect("first.stdout")
.try_into()
.expect("first.stdout into Stdio");

// Convert ChildStdin to Stdio
let second_stdout: Stdio = third
.stdin
.take()
.expect("third.stdin")
.try_into()
.expect("third.stdin into Stdio");

let mut second = cat()
.stdin(second_stdin)
.stdout(second_stdout)
.spawn()
.expect("first cmd");

let msg = "hello world! please pipe this message through";

let mut stdin = first.stdin.take().expect("first.stdin");
let write = async move { stdin.write_all(msg.as_bytes()).await };

let mut stdout = third.stdout.take().expect("third.stdout");
let read = async move {
let mut data = String::new();
stdout.read_to_string(&mut data).await.map(|_| data)
};

let (read, write, first_status, second_status, third_status) =
join!(read, write, first.wait(), second.wait(), third.wait());

assert_eq!(msg, read.expect("read result"));
write.expect("write result");

assert!(first_status.expect("first status").success());
assert!(second_status.expect("second status").success());
assert!(third_status.expect("third status").success());
}
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<E: Source> PollEvented<E> {
}

/// Deregister the inner io from the registration and returns a Result containing the inner io
#[cfg(feature = "net")]
#[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
self.registration.deregister(&mut inner)?;
Expand Down
70 changes: 70 additions & 0 deletions tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,51 @@
//! }
//! ```
//!
//! With some coordination, we can also pipe the output of one command into
//! another.
//!
//! ```no_run
//! use tokio::join;
//! use tokio::process::Command;
//! use std::convert::TryInto;
//! use std::process::Stdio;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut echo = Command::new("echo")
//! .arg("hello world!")
//! .stdout(Stdio::piped())
//! .spawn()
//! .expect("failed to spawn echo");
//!
//! let tr_stdin: Stdio = echo
//! .stdout
//! .take()
//! .unwrap()
//! .try_into()
//! .expect("failed to convert to Stdio");
//!
//! let tr = Command::new("tr")
//! .arg("a-z")
//! .arg("A-Z")
//! .stdin(tr_stdin)
//! .stdout(Stdio::piped())
//! .spawn()
//! .expect("failed to spawn tr");
//!
//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
//!
//! assert!(echo_result.unwrap().success());
//!
//! let tr_output = tr_output.expect("failed to await tr");
//! assert!(tr_output.status.success());
//!
//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
//!
//! Ok(())
//! }
//! ```
//!
//! # Caveats
//!
//! ## Dropping/Cancellation
Expand Down Expand Up @@ -147,6 +192,7 @@ mod kill;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;

use std::convert::TryInto;
use std::ffi::OsStr;
use std::future::Future;
use std::io;
Expand Down Expand Up @@ -1099,6 +1145,30 @@ impl AsyncRead for ChildStderr {
}
}

impl TryInto<Stdio> for ChildStdin {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

impl TryInto<Stdio> for ChildStdout {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

impl TryInto<Stdio> for ChildStderr {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

#[cfg(unix)]
mod sys {
use std::os::unix::io::{AsRawFd, RawFd};
Expand Down
53 changes: 38 additions & 15 deletions tokio/src/process/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::pin::Pin;
use std::process::{Child as StdChild, ExitStatus};
use std::process::{Child as StdChild, ExitStatus, Stdio};
use std::task::Context;
use std::task::Poll;

Expand Down Expand Up @@ -176,6 +176,18 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
// programs that know how to work with nonblocking stdio will know how to
// change it to nonblocking mode.
set_nonblocking(&mut fd, false)?;

Ok(Stdio::from(fd))
}

impl Source for Pipe {
fn register(
&mut self,
Expand Down Expand Up @@ -204,6 +216,29 @@ pub(crate) type ChildStdin = PollEvented<Pipe>;
pub(crate) type ChildStdout = PollEvented<Pipe>;
pub(crate) type ChildStderr = PollEvented<Pipe>;

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
let fd = fd.as_raw_fd();
let previous = libc::fcntl(fd, libc::F_GETFL);
if previous == -1 {
return Err(io::Error::last_os_error());
}

let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};

let r = libc::fcntl(fd, libc::F_SETFL, new);
if r == -1 {
return Err(io::Error::last_os_error());
}
}

Ok(())
}

fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>>
where
T: IntoRawFd,
Expand All @@ -214,20 +249,8 @@ where
};

// Set the fd to nonblocking before we pass it to the event loop
let pipe = unsafe {
let pipe = Pipe::from(io);
let fd = pipe.as_raw_fd();
let r = libc::fcntl(fd, libc::F_GETFL);
if r == -1 {
return Err(io::Error::last_os_error());
}
let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK);
if r == -1 {
return Err(io::Error::last_os_error());
}

pipe
};
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

Ok(Some(PollEvented::new(pipe)?))
}
36 changes: 34 additions & 2 deletions tokio/src/process/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::GetCurrentProcess;
use winapi::um::threadpoollegacyapiset::UnregisterWaitEx;
use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE};
use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE};
use winapi::um::winnt::{
BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
};

#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
Expand Down Expand Up @@ -171,3 +176,30 @@ where
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe).ok()
}

pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
let named_pipe = io.into_inner()?;

// Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
// duplicate the handle here...
unsafe {
let mut dup_handle = INVALID_HANDLE_VALUE;
let cur_proc = GetCurrentProcess();

let status = DuplicateHandle(
cur_proc,
named_pipe.as_raw_handle(),
cur_proc,
&mut dup_handle,
0 as DWORD,
FALSE,
DUPLICATE_SAME_ACCESS,
);

if status == 0 {
return Err(io::Error::last_os_error());
}

Ok(Stdio::from_raw_handle(dup_handle))
}
}

0 comments on commit 34f1d3d

Please sign in to comment.