Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: allow for ergonomically piping stdio of one child into another #3466

Merged
merged 6 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::join;
use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use std::convert::TryInto;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
Expand Down Expand Up @@ -139,3 +141,52 @@ async fn try_wait() {
// Can't get id after process has exited
assert_eq!(child.id(), None);
}

#[tokio::test]
async fn pipe_from_one_command_to_another() {
let mut first = cat().spawn().expect("first cmd");
let mut third = cat().spawn().expect("third cmd");

// Convert ChildStdout to Stdio
let second_stdin: Stdio = first
.stdout
.take()
.expect("first.stdout")
.try_into()
.expect("first.stdout into Stdio");

// Convert ChildStdin to Stdio
let second_stdout: Stdio = third
.stdin
.take()
.expect("third.stdin")
.try_into()
.expect("third.stdin into Stdio");

let mut second = cat()
.stdin(second_stdin)
.stdout(second_stdout)
.spawn()
.expect("first cmd");

let msg = "hello world! please pipe this message through";

let mut stdin = first.stdin.take().expect("first.stdin");
let write = async move { stdin.write_all(msg.as_bytes()).await };

let mut stdout = third.stdout.take().expect("third.stdout");
let read = async move {
let mut data = String::new();
stdout.read_to_string(&mut data).await.map(|_| data)
};

let (read, write, first_status, second_status, third_status) =
join!(read, write, first.wait(), second.wait(), third.wait());

assert_eq!(msg, read.expect("read result"));
write.expect("write result");

assert!(first_status.expect("first status").success());
assert!(second_status.expect("second status").success());
assert!(third_status.expect("third status").success());
}
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<E: Source> PollEvented<E> {
}

/// Deregister the inner io from the registration and returns a Result containing the inner io
#[cfg(feature = "net")]
#[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
self.registration.deregister(&mut inner)?;
Expand Down
70 changes: 70 additions & 0 deletions tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,51 @@
//! }
//! ```
//!
//! With some coordination, we can also pipe the output of one command into
//! another.
//!
//! ```no_run
//! use tokio::join;
//! use tokio::process::Command;
//! use std::convert::TryInto;
//! use std::process::Stdio;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut echo = Command::new("echo")
//! .arg("hello world!")
//! .stdout(Stdio::piped())
//! .spawn()
//! .expect("failed to spawn echo");
//!
//! let tr_stdin: Stdio = echo
//! .stdout
//! .take()
//! .unwrap()
//! .try_into()
//! .expect("failed to convert to Stdio");
//!
//! let tr = Command::new("tr")
//! .arg("a-z")
//! .arg("A-Z")
//! .stdin(tr_stdin)
//! .stdout(Stdio::piped())
//! .spawn()
//! .expect("failed to spawn tr");
//!
//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
//!
//! assert!(echo_result.unwrap().success());
//!
//! let tr_output = tr_output.expect("failed to await tr");
//! assert!(tr_output.status.success());
//!
//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
//!
//! Ok(())
//! }
//! ```
//!
//! # Caveats
//!
//! ## Dropping/Cancellation
Expand Down Expand Up @@ -147,6 +192,7 @@ mod kill;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;

use std::convert::TryInto;
use std::ffi::OsStr;
use std::future::Future;
use std::io;
Expand Down Expand Up @@ -1099,6 +1145,30 @@ impl AsyncRead for ChildStderr {
}
}

impl TryInto<Stdio> for ChildStdin {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

impl TryInto<Stdio> for ChildStdout {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

impl TryInto<Stdio> for ChildStderr {
type Error = io::Error;

fn try_into(self) -> Result<Stdio, Self::Error> {
imp::convert_to_stdio(self.inner)
}
}

#[cfg(unix)]
mod sys {
use std::os::unix::io::{AsRawFd, RawFd};
Expand Down
53 changes: 38 additions & 15 deletions tokio/src/process/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::pin::Pin;
use std::process::{Child as StdChild, ExitStatus};
use std::process::{Child as StdChild, ExitStatus, Stdio};
use std::task::Context;
use std::task::Poll;

Expand Down Expand Up @@ -176,6 +176,18 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
// programs that know how to work with nonblocking stdio will know how to
// change it to nonblocking mode.
set_nonblocking(&mut fd, false)?;
Comment on lines +182 to +186
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is ok because neither end of the pipe is actually used in Tokio?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. One end of the pipe should be owned by the child process (and already closed in the parent). The other end of the pipe is moved into this function, and calling .into_inner() unwraps the PollEvented and deregisters it from the reactor.


Ok(Stdio::from(fd))
}

impl Source for Pipe {
fn register(
&mut self,
Expand Down Expand Up @@ -204,6 +216,29 @@ pub(crate) type ChildStdin = PollEvented<Pipe>;
pub(crate) type ChildStdout = PollEvented<Pipe>;
pub(crate) type ChildStderr = PollEvented<Pipe>;

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
let fd = fd.as_raw_fd();
let previous = libc::fcntl(fd, libc::F_GETFL);
if previous == -1 {
return Err(io::Error::last_os_error());
}

let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};

let r = libc::fcntl(fd, libc::F_SETFL, new);
if r == -1 {
return Err(io::Error::last_os_error());
}
}

Ok(())
}

fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>>
where
T: IntoRawFd,
Expand All @@ -214,20 +249,8 @@ where
};

// Set the fd to nonblocking before we pass it to the event loop
let pipe = unsafe {
let pipe = Pipe::from(io);
let fd = pipe.as_raw_fd();
let r = libc::fcntl(fd, libc::F_GETFL);
if r == -1 {
return Err(io::Error::last_os_error());
}
let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK);
if r == -1 {
return Err(io::Error::last_os_error());
}

pipe
};
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

Ok(Some(PollEvented::new(pipe)?))
}
36 changes: 34 additions & 2 deletions tokio/src/process/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::GetCurrentProcess;
use winapi::um::threadpoollegacyapiset::UnregisterWaitEx;
use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE};
use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE};
use winapi::um::winnt::{
BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
};

#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
Expand Down Expand Up @@ -171,3 +176,30 @@ where
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe).ok()
}

pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
let named_pipe = io.into_inner()?;

// Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
// duplicate the handle here...
unsafe {
let mut dup_handle = INVALID_HANDLE_VALUE;
let cur_proc = GetCurrentProcess();

let status = DuplicateHandle(
cur_proc,
named_pipe.as_raw_handle(),
cur_proc,
&mut dup_handle,
0 as DWORD,
FALSE,
DUPLICATE_SAME_ACCESS,
);

if status == 0 {
return Err(io::Error::last_os_error());
}

Ok(Stdio::from_raw_handle(dup_handle))
}
}