Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove child ownership from Exit filter #207

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions examples/kqueue-process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,34 @@
target_os = "dragonfly",
))]
fn main() -> std::io::Result<()> {
use std::process::Command;
use std::{num::NonZeroI32, process::Command};

use async_io::os::kqueue::{Exit, Filter};
use futures_lite::future;

future::block_on(async {
// Spawn a process.
let process = Command::new("sleep")
let mut process = Command::new("sleep")
.arg("3")
.spawn()
.expect("failed to spawn process");

// Wrap the process in an `Async` object that waits for it to exit.
let process = Filter::new(Exit::new(process))?;
let process_handle = Filter::new(Exit::new(
NonZeroI32::new(process.id().try_into().expect("invalid process pid"))
.expect("non zero pid"),
))?;

// Wait for the process to exit.
process.ready().await?;
process_handle.ready().await?;

println!(
"Process exit code {:?}",
process
.try_wait()
.expect("error while waiting process")
.expect("process did not exit yet")
);
Ok(())
})
}
Expand Down
30 changes: 18 additions & 12 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::Async;

use std::future::Future;
use std::io::{Error, Result};
use std::num::NonZeroI32;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::pin::Pin;
use std::process::Child;
use std::task::{Context, Poll};

/// A wrapper around a queueable object that waits until it is ready.
Expand Down Expand Up @@ -41,13 +41,14 @@ impl<T: Queueable> Filter<T> {
///
/// ```no_run
/// use std::process::Command;
/// use std::num::NonZeroI32;
/// use async_io::os::kqueue::{Exit, Filter};
///
/// // Create a new process to wait for.
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
///
/// // Wrap the process in an `Async` object that waits for it to exit.
/// let process = Filter::new(Exit::new(child)).unwrap();
/// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
///
/// // Wait for the process to exit.
/// # async_io::block_on(async {
Expand Down Expand Up @@ -97,10 +98,11 @@ impl<T> Filter<T> {
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
/// use std::num::NonZeroI32;
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let process = Filter::new(Exit::new(child)).unwrap();
/// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
/// let inner = process.get_ref();
/// # });
/// ```
Expand All @@ -117,10 +119,11 @@ impl<T> Filter<T> {
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
/// use std::num::NonZeroI32;
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let mut process = Filter::new(Exit::new(child)).unwrap();
/// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
/// let inner = process.get_mut();
/// # });
/// ```
Expand All @@ -134,10 +137,11 @@ impl<T> Filter<T> {
///
/// ```
/// use async_io::os::kqueue::{Exit, Filter};
/// use std::num::NonZeroI32;
///
/// # futures_lite::future::block_on(async {
/// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
/// let process = Filter::new(Exit::new(child)).unwrap();
/// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
/// let inner = process.into_inner().unwrap();
/// # });
/// ```
Expand All @@ -153,12 +157,13 @@ impl<T> Filter<T> {
/// # Examples
///
/// ```no_run
/// use std::num::NonZeroI32;
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Filter::new(Exit::new(child))?;
/// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
///
/// // Wait for the process to exit.
/// process.ready().await?;
Expand All @@ -182,13 +187,14 @@ impl<T> Filter<T> {
/// # Examples
///
/// ```no_run
/// use std::num::NonZeroI32;
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
/// use futures_lite::future;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Filter::new(Exit::new(child))?;
/// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap();
///
/// // Wait for the process to exit.
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
Expand Down Expand Up @@ -233,23 +239,23 @@ impl QueueableSealed for Signal {
}
impl Queueable for Signal {}

/// Wait for a child process to exit.
/// Wait for a process to exit.
///
/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
#[derive(Debug)]
pub struct Exit(Option<Child>);
pub struct Exit(NonZeroI32);

impl Exit {
/// Create a new `Exit` object.
pub fn new(child: Child) -> Self {
Self(Some(child))
pub fn new(pid: NonZeroI32) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid a breaking change, can we make this a new method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, a new from_pid constructor added in 4308464 instead. I made it unsafe to propagate the safety statement when creating a polling process from a pid.

Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it actually needs to be unsafe? While it's unsafe for FDs, PIDs are allowed to be racy IIRC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason behind the unsafe was to propagate the "safety" statement defined in the polling process (and used by the reactor):

/// Create a `Process` from a PID.
    ///
    /// # Safety
    ///
    /// The PID must be tied to an actual child process.
    pub unsafe fn from_pid(pid: std::num::NonZeroI32, ops: ProcessOps) -> Self {
        Self {
            pid: unsafe { rustix::process::Pid::from_raw_unchecked(pid.get()) },
            ops,
            _lt: PhantomData,
        }
    }

With the new Exit status API, the struct can be crated for any given u32 value, thus the possibility of not being tied to an actual child process (differently than child.id()). This is the error when the provided pid is not attached to a process:

Error: Os { code: 3, kind: Uncategorized, message: "No such process" }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that with the new from_pid it lets the user provide any PID value, do you still think the unsafe guidance is not needed?

Thanks for your feedback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, keep it as unsafe for now. It's been a while since I've reviewed kqueue's implementation, I trust my past self more in that regards.

Self(pid)
}
}

impl QueueableSealed for Exit {
fn registration(&mut self) -> Registration {
Registration::Process(self.0.take().expect("Cannot reregister child"))
Registration::Process(self.0)
}
}
impl Queueable for Exit {}
Expand Down
18 changes: 9 additions & 9 deletions src/reactor/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use polling::{Event, PollMode, Poller};

use std::fmt;
use std::io::Result;
use std::num::NonZeroI32;
use std::os::unix::io::{AsRawFd, BorrowedFd, RawFd};
use std::process::Child;

/// The raw registration into the reactor.
///
Expand All @@ -27,8 +27,8 @@ pub enum Registration {
/// Raw signal number for signal delivery.
Signal(Signal),

/// Process for process termination.
Process(Child),
/// Pid for process termination.
Process(NonZeroI32),
}

impl fmt::Debug for Registration {
Expand Down Expand Up @@ -62,8 +62,8 @@ impl Registration {
Self::Signal(signal) => {
poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot)
}
Self::Process(process) => poller.add_filter(
unsafe { Process::new(process, ProcessOps::Exit) },
Self::Process(pid) => poller.add_filter(
unsafe { Process::from_pid(*pid, ProcessOps::Exit) },
token,
PollMode::Oneshot,
),
Expand All @@ -82,8 +82,8 @@ impl Registration {
Self::Signal(signal) => {
poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot)
}
Self::Process(process) => poller.modify_filter(
unsafe { Process::new(process, ProcessOps::Exit) },
Self::Process(pid) => poller.modify_filter(
unsafe { Process::from_pid(*pid, ProcessOps::Exit) },
interest.key,
PollMode::Oneshot,
),
Expand All @@ -100,8 +100,8 @@ impl Registration {
poller.delete(fd)
}
Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)),
Self::Process(process) => {
poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) })
Self::Process(pid) => {
poller.delete_filter(unsafe { Process::from_pid(*pid, ProcessOps::Exit) })
}
}
}
Expand Down
Loading