Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add support for anonymous unix pipes #6127

Merged
merged 10 commits into from
Dec 30, 2023
Merged

Conversation

satakuma
Copy link
Member

@satakuma satakuma commented Nov 3, 2023

Motivation

Allow users to work with anonymous Unix pipes in Tokio, i.e.:

  • create a new, nonblocking, anonymous pipe with a specialized function in tokio::net::unix::pipe
  • use Tokio types from tokio::net::unix::pipe with pipes created in other ways, such as manual pipe() syscalls.

Closes: #5547

Solution

  1. Add the tokio::net::unix::pipe::new() function, which creates an anonymous pipe. This function delegates creation of a pipe to Mio, which in turn performs either pipe() or pipe2() syscall.
  2. Add functions to do conversion from and into OwnedFd for pipe::Sender and pipe::Receiver structs. Each of the structs gets the following new methods:
  • fn into_owned_fd(self) -> io::Result<OwnedFd>
  • fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender>
  • fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender>

into_owned_fd will put the pipe end into blocking mode to prevent errors in case when the file descriptor is later passed as Stdio to a child process, which usually expects a blocking stdio. This behavior is also consistent with into_owned_fd for pipes used in tokio::process.

Similarly to from_file, from_owned_fd checks if the provided file descriptor:

  1. is a pipe,
  2. has relevant read or write access,
  3. is in non-blocking mode and if it isn't, then sets the O_NONBLOCK flag.

from_owned_fd_unchecked does none of these checks. I opted for adding both checked and unchecked variants for conversion from a file descriptor, since those types already have from_file and from_file_unchecked methods, which have similar semantics. However, this solution is a little bloated, so I am open to other ideas.

@satakuma satakuma added A-tokio Area: The main tokio crate M-net Module: tokio/net labels Nov 3, 2023
Comment on lines 24 to 27
/// If you need to create a pipe for communicating with a spawned process, you can
/// also see how to use `Stdio::piped()` with [`tokio::process`].
///
/// [`tokio::process`]: crate::process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this sentence. Are you saying that if they want to do this, they should read the source code of tokio::process to figure out how?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to encourage users to first see if using a combination like Command::stdout(Stdio::piped()) and ChildStdout is enough for their use case of anonymous pipes when spawning a process. This interface has been in tokio and the standard library for ages, and is arguably easier to use than tokio::net::unix::pipe, although it's obviously limited to stdin/stdout/stderr. However this module will be the first to show up if a user tries to search "pipe" in tokio's docs.

In the module level documentation of tokio::process there are examples using Stdio::piped(), but I don't know how to directly link to them.

/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new() -> io::Result<(Sender, Receiver)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to call this something else. We have similar methods for various message passing channels, and there the method is called "channel".

Suggested change
pub fn new() -> io::Result<(Sender, Receiver)> {
pub fn pipe() -> io::Result<(Sender, Receiver)> {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, pipe sounds more reasonable.

/// it in blocking mode and perform the conversion.
pub fn into_owned_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
set_blocking(&mio_pipe)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if set_blocking fails? Is the fd closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file descriptor gets closed once mio_pipe is dropped.

Comment on lines 701 to 705
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_owned_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
set_blocking(&mio_pipe)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should set the fd into blocking mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the one hand, all other methods in net such as TcpStream::into_std do not change the non-blocking mode. On the other hand, ChildStdio, which also represents a pipe on unix, does set the fd into blocking mode. One argument I can see behind doing so is the following example from one of the tests:

let (tx, mut rx) = pipe::new()?;
let status = Command::new("echo")
.arg("-n")
.arg(DATA)
.stdout(tx.into_owned_fd()?)
.status();

Usually, the spawned process will expect a blocking stdio, and in my opinion, it's easy to forget about setting the blocking mode manually. Let me know what you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note, we should document the fact that ChildStdout::into_owned_fd and others set the blocking mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps split into two methods: into_blocking_fd, into_nonblocking_fd.

tokio/src/net/unix/pipe.rs Show resolved Hide resolved
@satakuma satakuma force-pushed the satakuma/anonymous-pipe-fd branch from 58b0a11 to 9097f3d Compare December 9, 2023 19:41
@satakuma
Copy link
Member Author

satakuma commented Dec 9, 2023

I've addressed your review comments and divided into_owned_fd into two methods. Please take another look @Darksonn.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one nit, otherwise LGTM.

tokio/src/net/unix/pipe.rs Outdated Show resolved Hide resolved
@Darksonn Darksonn enabled auto-merge (squash) December 30, 2023 15:25
@Darksonn Darksonn merged commit 48345d6 into master Dec 30, 2023
72 checks passed
@Darksonn Darksonn deleted the satakuma/anonymous-pipe-fd branch December 30, 2023 15:37
kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Feb 5, 2024
Bumps tokio from 1.35.1 to 1.36.0.

Release notes
Sourced from tokio's releases.

Tokio v1.36.0
1.36.0 (February 2nd, 2024)
Added

io: add tokio::io::Join (#6220)
io: implement AsyncWrite for Empty (#6235)
net: add support for anonymous unix pipes (#6127)
net: add UnixSocket (#6290)
net: expose keepalive option on TcpSocket (#6311)
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
sync: add Sender::{try_,}reserve_many (#6205)
sync: add watch::Receiver::mark_unchanged (#6252)
task: add JoinSet::try_join_next (#6280)

Changed

io: make copy cooperative (#6265)
io: make repeat and sink cooperative (#6254)
io: simplify check for empty slice (#6293)
process: use pidfd on Linux when available (#6152)
sync: use AtomicBool in broadcast channel future (#6298)

Documented

io: clarify clear_ready docs (#6304)
net: document that *Fd traits on TcpSocket are unix-only (#6294)
sync: document FIFO behavior of tokio::sync::Mutex (#6279)
chore: typographic improvements (#6262)
runtime: remove obsolete comment (#6303)
task: fix typo (#6261)

#6220: tokio-rs/tokio#6220
#6235: tokio-rs/tokio#6235
#6127: tokio-rs/tokio#6127
#6290: tokio-rs/tokio#6290
#6311: tokio-rs/tokio#6311
#6236: tokio-rs/tokio#6236
#6205: tokio-rs/tokio#6205
#6252: tokio-rs/tokio#6252
#6280: tokio-rs/tokio#6280
#6265: tokio-rs/tokio#6265
#6254: tokio-rs/tokio#6254
#6293: tokio-rs/tokio#6293
#6238: tokio-rs/tokio#6238
#6152: tokio-rs/tokio#6152
#6298: tokio-rs/tokio#6298
#6262: tokio-rs/tokio#6262
#6303: tokio-rs/tokio#6303
#6261: tokio-rs/tokio#6261


... (truncated)


Commits

eaf81ed chore: prepare Tokio v1.36.0 (#6312)
53f9e5a ci: make sure dictionary words are sorted and unique (#6316)
9077762 net: expose keepalive option on TcpSocket (#6311)
131e7b4 ci: add spellchecking (#6297)
e53b92a io: clarify clear_ready docs (#6304)
7536132 sync: use AtomicBool in broadcast channel future (#6298)
b6d0c90 macros: fix trait_method breaking change detection (#6308)
4846959 runtime: remove obsolete comment (#6303)
ec30383 net: add UnixSocket (#6290)
f80bbec io: simplify check for empty slice (#6293)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-net Module: tokio/net
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for anonymous pipes
2 participants