Skip to content

Commit

Permalink
Ensure that sandboxed processes exit before their sandboxes are clean…
Browse files Browse the repository at this point in the history
…ed up (Cherry-pick of pantsbuild#18632)

As @jsirois discovered and described in
pantsbuild#16778 (comment),
because the `local::CommandRunner` does not `wait` for its child process
to have exited, it might be possible for a SIGKILL to not have taken
effect on the child process before its sandbox has been torn down. And
that could result in the process seeing a partial sandbox (and in the
case of pantsbuild#16778, potentially cause named cache corruption).

This change ensures that we block to call `wait` when spawning local
processes by wrapping them in the `ManagedChild` helper that we were
already using for interactive processes. It then attempts to clarify the
contract of the `CapturedWorkdir` trait (but in order to improve
cherry-pickability does not attempt to adjust it), to make it clear that
child processes should fully exit before the `run_and_capture_workdir`
method has returned.

Fixes pantsbuild#16778 🤞.

---------

Co-authored-by: John Sirois <[email protected]>
(cherry picked from commit 1462bef)
  • Loading branch information
stuhood authored and jsirois committed Mar 31, 2023
1 parent 0385778 commit 122d798
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 148 deletions.
83 changes: 48 additions & 35 deletions src/rust/engine/process_execution/src/children.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};

use nix::sys::signal;
Expand All @@ -12,20 +11,22 @@ const GRACEFUL_SHUTDOWN_POLL_TIME: time::Duration = time::Duration::from_millis(
/// A child process running in its own PGID, with a drop implementation that will kill that
/// PGID.
///
/// Will optionally attempt a graceful shutdown first using `SIGINT`.
///
/// TODO: If this API is useful, we should consider extending it to parented Nailgun processes
/// and to all local execution in general. It could also be adjusted for sending other posix
/// signals in sequence for https://github.com/pantsbuild/pants/issues/13230.
pub struct ManagedChild {
child: Child,
graceful_shutdown_timeout: time::Duration,
killed: AtomicBool,
graceful_shutdown_timeout: Option<time::Duration>,
killed: bool,
}

impl ManagedChild {
pub fn spawn(
mut command: Command,
graceful_shutdown_timeout: time::Duration,
) -> Result<Self, String> {
command: &mut Command,
graceful_shutdown_timeout: Option<time::Duration>,
) -> std::io::Result<Self> {
// Set `kill_on_drop` to encourage `tokio` to `wait` the process via its own "reaping"
// mechanism:
// see https://docs.rs/tokio/1.14.0/tokio/process/struct.Command.html#method.kill_on_drop
Expand All @@ -45,13 +46,11 @@ impl ManagedChild {
};

// Then spawn.
let child = command
.spawn()
.map_err(|e| format!("Error executing interactive process: {}", e))?;
let child = command.spawn()?;
Ok(Self {
child,
graceful_shutdown_timeout,
killed: AtomicBool::new(false),
killed: false,
})
}

Expand Down Expand Up @@ -98,49 +97,63 @@ impl ManagedChild {
&mut self,
max_wait_duration: time::Duration,
) -> Result<bool, String> {
let maybe_id = self.child.id();
let deadline = time::Instant::now() + max_wait_duration;
while time::Instant::now() <= deadline {
if self.check_child_has_exited()? {
return Ok(true);
}
log::debug!("Waiting for {:?} to exit...", maybe_id);
thread::sleep(GRACEFUL_SHUTDOWN_POLL_TIME);
}
// if we get here we have timed-out
// If we get here we have timed-out.
Ok(false)
}

/// Attempt to gracefully shutdown the process.
/// Attempt to shutdown the process (gracefully, if was configured that way at creation).
///
/// This will send a SIGINT to the process and give it a chance to shutdown gracefully. If the
/// Graceful shutdown will send a SIGINT to the process and give it a chance to exit. If the
/// process does not respond to the SIGINT within a fixed interval, a SIGKILL will be sent.
///
/// This method *will* block the current thread but will do so for a bounded amount of time.
pub fn graceful_shutdown_sync(&mut self) -> Result<(), String> {
self.signal_pg(signal::Signal::SIGINT)?;
match self.wait_for_child_exit_sync(self.graceful_shutdown_timeout) {
Ok(true) => {
// process was gracefully shutdown
self.killed.store(true, Ordering::SeqCst);
Ok(())
}
Ok(false) => {
// we timed out waiting for the child to exit, so we need to kill it.
log::warn!(
"Timed out waiting for graceful shutdown of process group. Will try SIGKILL instead."
);
self.kill_pgid()
}
Err(e) => {
log::warn!("An error occurred while waiting for graceful shutdown of process group ({}). Will try SIGKILL instead.", e);
self.kill_pgid()
/// NB: This method *will* block the current thread but it will do so for a bounded amount of time,
/// as long as the operating system responds to `SIGKILL` in a bounded amount of time.
///
/// TODO: Async drop might eventually allow for making this blocking more explicit.
/// <https://rust-lang.github.io/async-fundamentals-initiative/roadmap/async_drop.html>
pub fn attempt_shutdown_sync(&mut self) -> Result<(), String> {
if let Some(graceful_shutdown_timeout) = self.graceful_shutdown_timeout {
// If we fail to send SIGINT, then we will also fail to send SIGKILL, so we return eagerly
// on error here.
self.signal_pg(signal::Signal::SIGINT)?;
match self.wait_for_child_exit_sync(graceful_shutdown_timeout) {
Ok(true) => {
// Process was gracefully shutdown: return.
self.killed = true;
return Ok(());
}
Ok(false) => {
// We timed out waiting for the child to exit, so we need to kill it.
log::warn!(
"Timed out waiting for graceful shutdown of process group. Will try SIGKILL instead."
);
}
Err(e) => {
log::warn!("An error occurred while waiting for graceful shutdown of process group ({}). Will try SIGKILL instead.", e);
}
}
}

self.kill_pgid()
}

/// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill.
fn kill_pgid(&mut self) -> Result<(), String> {
self.signal_pg(signal::Signal::SIGKILL)?;
self.killed.store(true, Ordering::SeqCst);
// NB: Since the SIGKILL was successfully delivered above, the only things that could cause the
// child not to eventually exit would be if it had become a zombie (which shouldn't be possible,
// because we are its parent process, and we are still alive).
let _ = self.wait_for_child_exit_sync(time::Duration::from_secs(1800))?;
self.killed = true;
Ok(())
}
}
Expand All @@ -162,8 +175,8 @@ impl DerefMut for ManagedChild {
/// Implements drop by killing the process group.
impl Drop for ManagedChild {
fn drop(&mut self) {
if !self.killed.load(Ordering::SeqCst) {
let _ = self.graceful_shutdown_sync();
if !self.killed {
let _ = self.attempt_shutdown_sync();
}
}
}
94 changes: 27 additions & 67 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ffi::OsStr;
use std::fmt::{self, Debug};
use std::fs::create_dir_all;
use std::io::Write;
Expand Down Expand Up @@ -28,15 +29,15 @@ use shell_quote::bash;
use store::{OneOffStoreFileByDigest, Snapshot, Store, StoreError};
use task_executor::Executor;
use tempfile::TempDir;
use tokio::process::{Child, Command};
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::{timeout, Duration};
use tokio_util::codec::{BytesCodec, FramedRead};
use workunit_store::{in_workunit, Level, Metric, RunningWorkunit};

use crate::{
Context, FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Platform, Process,
ProcessError, ProcessResultMetadata, ProcessResultSource,
Context, FallibleProcessResultWithPlatform, ImmutableInputs, ManagedChild, NamedCaches, Platform,
Process, ProcessError, ProcessResultMetadata, ProcessResultSource,
};

pub const USER_EXECUTABLE_MODE: u32 = 0o100755;
Expand Down Expand Up @@ -151,66 +152,6 @@ impl Debug for CommandRunner {
}
}

pub struct HermeticCommand {
inner: Command,
}

///
/// A command that accepts no input stream and does not consult the `PATH`.
///
impl HermeticCommand {
fn new<S: AsRef<OsStr>>(program: S) -> HermeticCommand {
let mut inner = Command::new(program);
inner
// TODO: This will not universally prevent child processes continuing to run in the
// background, because killing a pantsd client with Ctrl+C kills the server with a signal,
// which won't currently result in an orderly dropping of everything in the graph. See #10004.
.kill_on_drop(true)
.env_clear()
// It would be really nice not to have to manually set PATH but this is sadly the only way
// to stop automatic PATH searching.
.env("PATH", "");
HermeticCommand { inner }
}

fn args<I, S>(&mut self, args: I) -> &mut HermeticCommand
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.inner.args(args);
self
}

fn envs<I, K, V>(&mut self, vars: I) -> &mut HermeticCommand
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.inner.envs(vars);
self
}

fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut HermeticCommand {
self.inner.current_dir(dir);
self
}

fn spawn<O: Into<Stdio>, E: Into<Stdio>>(
&mut self,
stdout: O,
stderr: E,
) -> std::io::Result<Child> {
self
.inner
.stdin(Stdio::null())
.stdout(stdout)
.stderr(stderr)
.spawn()
}
}

// TODO: A Stream that ends with `Exit` is error prone: we should consider creating a Child struct
// similar to nails::server::Child (which is itself shaped like `std::process::Child`).
// See https://github.com/stuhood/nails/issues/1 for more info.
Expand Down Expand Up @@ -286,6 +227,10 @@ impl super::CommandRunner for CommandRunner {
.await?;

workunit.increment_counter(Metric::LocalExecutionRequests, 1);
// NB: The constraint on `CapturedWorkdir` is that any child processes spawned here have
// exited (or been killed in their `Drop` handlers), so this function can rely on the usual
// Drop order of local variables to assume that the sandbox is cleaned up after the process
// is.
let res = self
.run_and_capture_workdir(
req.clone(),
Expand Down Expand Up @@ -341,8 +286,18 @@ impl CapturedWorkdir for CommandRunner {
} else {
workdir_path.to_owned()
};
let mut command = HermeticCommand::new(&req.argv[0]);
command.args(&req.argv[1..]).current_dir(cwd).envs(&req.env);
let mut command = Command::new(&req.argv[0]);
command
.env_clear()
// It would be really nice not to have to manually set PATH but this is sadly the only way
// to stop automatic PATH searching.
.env("PATH", "")
.args(&req.argv[1..])
.current_dir(cwd)
.envs(&req.env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());

// See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn`
// indicates the binary we're spawning was written out by the current thread, and, as such,
Expand All @@ -361,7 +316,7 @@ impl CapturedWorkdir for CommandRunner {
//
// See: https://github.com/golang/go/issues/22315 for an excellent description of this generic
// unix problem.
let mut fork_exec = move || command.spawn(Stdio::piped(), Stdio::piped());
let mut fork_exec = move || ManagedChild::spawn(&mut command, None);
let mut child = {
if exclusive_spawn {
let _write_locked = self.spawn_lock.write().await;
Expand Down Expand Up @@ -561,6 +516,11 @@ pub trait CapturedWorkdir {
///
/// Spawn the given process in a working directory prepared with its expected input digest.
///
/// NB: The implementer of this method must guarantee that the spawned process has completely
/// exited when the returned BoxStream is Dropped. Otherwise it might be possible for the process
/// to observe the working directory that it is running in being torn down. In most cases, this
/// requires Drop handlers to synchronously wait for their child processes to exit.
///
/// If the process to be executed has an `argv[0]` that points into its input digest then
/// `exclusive_spawn` will be `true` and the spawn implementation should account for the
/// possibility of concurrent fork+exec holding open the cloned `argv[0]` file descriptor, which,
Expand Down
94 changes: 48 additions & 46 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,53 +585,55 @@ fn interactive_process(
task_side_effected()?;
}

let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
let (term_stdin, term_stdout, term_stderr) =
stdio::get_destination().exclusive_start(Box::new(|_| {
// A stdio handler that will immediately trigger logging.
Err(())
}))?;
// NB: Command's stdio methods take ownership of a file-like to use, so we use
// `TryCloneAsFile` here to `dup` our thread-local stdio.
command
.stdin(Stdio::from(
term_stdin
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdin: {}", e))?,
))
.stdout(Stdio::from(
term_stdout
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdout: {}", e))?,
))
.stderr(Stdio::from(
term_stderr
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
));
let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.graceful_shutdown_sync() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
let (term_stdin, term_stdout, term_stderr) =
stdio::get_destination().exclusive_start(Box::new(|_| {
// A stdio handler that will immediately trigger logging.
Err(())
}))?;
// NB: Command's stdio methods take ownership of a file-like to use, so we use
// `TryCloneAsFile` here to `dup` our thread-local stdio.
command
.stdin(Stdio::from(
term_stdin
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdin: {e}"))?,
))
.stdout(Stdio::from(
term_stdout
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdout: {e}"))?,
))
.stderr(Stdio::from(
term_stderr
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {e}"))?,
));
let mut subprocess =
ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout))
.map_err(|e| format!("Error executing interactive process: {e}"))?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.attempt_shutdown_sync() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
}
exit_status = subprocess.wait() => {
// The process exited.
exit_status.map_err(|e| e.to_string())
}
}
exit_status = subprocess.wait() => {
// The process exited.
exit_status.map_err(|e| e.to_string())
}
}
})
.await?;
})
.await?;

let code = exit_status.code().unwrap_or(-1);
if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {
Expand Down

0 comments on commit 122d798

Please sign in to comment.