diff --git a/src/rust/engine/process_execution/src/children.rs b/src/rust/engine/process_execution/src/children.rs index 34010a98677..e918cb3dd81 100644 --- a/src/rust/engine/process_execution/src/children.rs +++ b/src/rust/engine/process_execution/src/children.rs @@ -1,5 +1,4 @@ use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::{thread, time}; use nix::sys::signal; @@ -12,20 +11,22 @@ const GRACEFUL_SHUTDOWN_POLL_TIME: time::Duration = time::Duration::from_millis( /// A child process running in its own PGID, with a drop implementation that will kill that /// PGID. /// +/// Will optionally attempt a graceful shutdown first using `SIGINT`. +/// /// TODO: If this API is useful, we should consider extending it to parented Nailgun processes /// and to all local execution in general. It could also be adjusted for sending other posix /// signals in sequence for https://github.com/pantsbuild/pants/issues/13230. pub struct ManagedChild { child: Child, - graceful_shutdown_timeout: time::Duration, - killed: AtomicBool, + graceful_shutdown_timeout: Option, + killed: bool, } impl ManagedChild { pub fn spawn( - mut command: Command, - graceful_shutdown_timeout: time::Duration, - ) -> Result { + command: &mut Command, + graceful_shutdown_timeout: Option, + ) -> std::io::Result { // Set `kill_on_drop` to encourage `tokio` to `wait` the process via its own "reaping" // mechanism: // see https://docs.rs/tokio/1.14.0/tokio/process/struct.Command.html#method.kill_on_drop @@ -45,13 +46,11 @@ impl ManagedChild { }; // Then spawn. - let child = command - .spawn() - .map_err(|e| format!("Error executing interactive process: {}", e))?; + let child = command.spawn()?; Ok(Self { child, graceful_shutdown_timeout, - killed: AtomicBool::new(false), + killed: false, }) } @@ -98,49 +97,63 @@ impl ManagedChild { &mut self, max_wait_duration: time::Duration, ) -> Result { + let maybe_id = self.child.id(); let deadline = time::Instant::now() + max_wait_duration; while time::Instant::now() <= deadline { if self.check_child_has_exited()? { return Ok(true); } + log::debug!("Waiting for {:?} to exit...", maybe_id); thread::sleep(GRACEFUL_SHUTDOWN_POLL_TIME); } - // if we get here we have timed-out + // If we get here we have timed-out. Ok(false) } - /// Attempt to gracefully shutdown the process. + /// Attempt to shutdown the process (gracefully, if was configured that way at creation). /// - /// This will send a SIGINT to the process and give it a chance to shutdown gracefully. If the + /// Graceful shutdown will send a SIGINT to the process and give it a chance to exit. If the /// process does not respond to the SIGINT within a fixed interval, a SIGKILL will be sent. /// - /// This method *will* block the current thread but will do so for a bounded amount of time. - pub fn graceful_shutdown_sync(&mut self) -> Result<(), String> { - self.signal_pg(signal::Signal::SIGINT)?; - match self.wait_for_child_exit_sync(self.graceful_shutdown_timeout) { - Ok(true) => { - // process was gracefully shutdown - self.killed.store(true, Ordering::SeqCst); - Ok(()) - } - Ok(false) => { - // we timed out waiting for the child to exit, so we need to kill it. - log::warn!( - "Timed out waiting for graceful shutdown of process group. Will try SIGKILL instead." - ); - self.kill_pgid() - } - Err(e) => { - log::warn!("An error occurred while waiting for graceful shutdown of process group ({}). Will try SIGKILL instead.", e); - self.kill_pgid() + /// NB: This method *will* block the current thread but it will do so for a bounded amount of time, + /// as long as the operating system responds to `SIGKILL` in a bounded amount of time. + /// + /// TODO: Async drop might eventually allow for making this blocking more explicit. + /// + pub fn attempt_shutdown_sync(&mut self) -> Result<(), String> { + if let Some(graceful_shutdown_timeout) = self.graceful_shutdown_timeout { + // If we fail to send SIGINT, then we will also fail to send SIGKILL, so we return eagerly + // on error here. + self.signal_pg(signal::Signal::SIGINT)?; + match self.wait_for_child_exit_sync(graceful_shutdown_timeout) { + Ok(true) => { + // Process was gracefully shutdown: return. + self.killed = true; + return Ok(()); + } + Ok(false) => { + // We timed out waiting for the child to exit, so we need to kill it. + log::warn!( + "Timed out waiting for graceful shutdown of process group. Will try SIGKILL instead." + ); + } + Err(e) => { + log::warn!("An error occurred while waiting for graceful shutdown of process group ({}). Will try SIGKILL instead.", e); + } } } + + self.kill_pgid() } /// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill. fn kill_pgid(&mut self) -> Result<(), String> { self.signal_pg(signal::Signal::SIGKILL)?; - self.killed.store(true, Ordering::SeqCst); + // NB: Since the SIGKILL was successfully delivered above, the only things that could cause the + // child not to eventually exit would be if it had become a zombie (which shouldn't be possible, + // because we are its parent process, and we are still alive). + let _ = self.wait_for_child_exit_sync(time::Duration::from_secs(1800))?; + self.killed = true; Ok(()) } } @@ -162,8 +175,8 @@ impl DerefMut for ManagedChild { /// Implements drop by killing the process group. impl Drop for ManagedChild { fn drop(&mut self) { - if !self.killed.load(Ordering::SeqCst) { - let _ = self.graceful_shutdown_sync(); + if !self.killed { + let _ = self.attempt_shutdown_sync(); } } } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index e7897012c71..ef9d70dac48 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -1,5 +1,6 @@ +// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::ffi::OsStr; use std::fmt::{self, Debug}; use std::fs::create_dir_all; use std::io::Write; @@ -28,15 +29,15 @@ use shell_quote::bash; use store::{OneOffStoreFileByDigest, Snapshot, Store, StoreError}; use task_executor::Executor; use tempfile::TempDir; -use tokio::process::{Child, Command}; +use tokio::process::Command; use tokio::sync::RwLock; use tokio::time::{timeout, Duration}; use tokio_util::codec::{BytesCodec, FramedRead}; use workunit_store::{in_workunit, Level, Metric, RunningWorkunit}; use crate::{ - Context, FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Platform, Process, - ProcessError, ProcessResultMetadata, ProcessResultSource, + Context, FallibleProcessResultWithPlatform, ImmutableInputs, ManagedChild, NamedCaches, Platform, + Process, ProcessError, ProcessResultMetadata, ProcessResultSource, }; pub const USER_EXECUTABLE_MODE: u32 = 0o100755; @@ -151,66 +152,6 @@ impl Debug for CommandRunner { } } -pub struct HermeticCommand { - inner: Command, -} - -/// -/// A command that accepts no input stream and does not consult the `PATH`. -/// -impl HermeticCommand { - fn new>(program: S) -> HermeticCommand { - let mut inner = Command::new(program); - inner - // TODO: This will not universally prevent child processes continuing to run in the - // background, because killing a pantsd client with Ctrl+C kills the server with a signal, - // which won't currently result in an orderly dropping of everything in the graph. See #10004. - .kill_on_drop(true) - .env_clear() - // It would be really nice not to have to manually set PATH but this is sadly the only way - // to stop automatic PATH searching. - .env("PATH", ""); - HermeticCommand { inner } - } - - fn args(&mut self, args: I) -> &mut HermeticCommand - where - I: IntoIterator, - S: AsRef, - { - self.inner.args(args); - self - } - - fn envs(&mut self, vars: I) -> &mut HermeticCommand - where - I: IntoIterator, - K: AsRef, - V: AsRef, - { - self.inner.envs(vars); - self - } - - fn current_dir>(&mut self, dir: P) -> &mut HermeticCommand { - self.inner.current_dir(dir); - self - } - - fn spawn, E: Into>( - &mut self, - stdout: O, - stderr: E, - ) -> std::io::Result { - self - .inner - .stdin(Stdio::null()) - .stdout(stdout) - .stderr(stderr) - .spawn() - } -} - // TODO: A Stream that ends with `Exit` is error prone: we should consider creating a Child struct // similar to nails::server::Child (which is itself shaped like `std::process::Child`). // See https://github.com/stuhood/nails/issues/1 for more info. @@ -286,6 +227,10 @@ impl super::CommandRunner for CommandRunner { .await?; workunit.increment_counter(Metric::LocalExecutionRequests, 1); + // NB: The constraint on `CapturedWorkdir` is that any child processes spawned here have + // exited (or been killed in their `Drop` handlers), so this function can rely on the usual + // Drop order of local variables to assume that the sandbox is cleaned up after the process + // is. let res = self .run_and_capture_workdir( req.clone(), @@ -341,8 +286,18 @@ impl CapturedWorkdir for CommandRunner { } else { workdir_path.to_owned() }; - let mut command = HermeticCommand::new(&req.argv[0]); - command.args(&req.argv[1..]).current_dir(cwd).envs(&req.env); + let mut command = Command::new(&req.argv[0]); + command + .env_clear() + // It would be really nice not to have to manually set PATH but this is sadly the only way + // to stop automatic PATH searching. + .env("PATH", "") + .args(&req.argv[1..]) + .current_dir(cwd) + .envs(&req.env) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); // See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn` // indicates the binary we're spawning was written out by the current thread, and, as such, @@ -361,7 +316,7 @@ impl CapturedWorkdir for CommandRunner { // // See: https://github.com/golang/go/issues/22315 for an excellent description of this generic // unix problem. - let mut fork_exec = move || command.spawn(Stdio::piped(), Stdio::piped()); + let mut fork_exec = move || ManagedChild::spawn(&mut command, None); let mut child = { if exclusive_spawn { let _write_locked = self.spawn_lock.write().await; @@ -561,6 +516,11 @@ pub trait CapturedWorkdir { /// /// Spawn the given process in a working directory prepared with its expected input digest. /// + /// NB: The implementer of this method must guarantee that the spawned process has completely + /// exited when the returned BoxStream is Dropped. Otherwise it might be possible for the process + /// to observe the working directory that it is running in being torn down. In most cases, this + /// requires Drop handlers to synchronously wait for their child processes to exit. + /// /// If the process to be executed has an `argv[0]` that points into its input digest then /// `exclusive_spawn` will be `true` and the spawn implementation should account for the /// possibility of concurrent fork+exec holding open the cloned `argv[0]` file descriptor, which, diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index ae2224c6283..028afcde2a1 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -585,53 +585,55 @@ fn interactive_process( task_side_effected()?; } - let exit_status = session.clone() - .with_console_ui_disabled(async move { - // Once any UI is torn down, grab exclusive access to the console. - let (term_stdin, term_stdout, term_stderr) = - stdio::get_destination().exclusive_start(Box::new(|_| { - // A stdio handler that will immediately trigger logging. - Err(()) - }))?; - // NB: Command's stdio methods take ownership of a file-like to use, so we use - // `TryCloneAsFile` here to `dup` our thread-local stdio. - command - .stdin(Stdio::from( - term_stdin - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdin: {}", e))?, - )) - .stdout(Stdio::from( - term_stdout - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stdout: {}", e))?, - )) - .stderr(Stdio::from( - term_stderr - .try_clone_as_file() - .map_err(|e| format!("Couldn't clone stderr: {}", e))?, - )); - let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?; - tokio::select! { - _ = session.cancelled() => { - // The Session was cancelled: attempt to kill the process group / process, and - // then wait for it to exit (to avoid zombies). - if let Err(e) = subprocess.graceful_shutdown_sync() { - // Failed to kill the PGID: try the non-group form. - log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ - This is unexpected: please file an issue about this problem at \ - [https://github.com/pantsbuild/pants/issues/new]", e); - subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?; - }; - subprocess.wait().await.map_err(|e| e.to_string()) + let exit_status = session.clone() + .with_console_ui_disabled(async move { + // Once any UI is torn down, grab exclusive access to the console. + let (term_stdin, term_stdout, term_stderr) = + stdio::get_destination().exclusive_start(Box::new(|_| { + // A stdio handler that will immediately trigger logging. + Err(()) + }))?; + // NB: Command's stdio methods take ownership of a file-like to use, so we use + // `TryCloneAsFile` here to `dup` our thread-local stdio. + command + .stdin(Stdio::from( + term_stdin + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdin: {e}"))?, + )) + .stdout(Stdio::from( + term_stdout + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdout: {e}"))?, + )) + .stderr(Stdio::from( + term_stderr + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stderr: {e}"))?, + )); + let mut subprocess = + ManagedChild::spawn(&mut command, Some(context.core.graceful_shutdown_timeout)) + .map_err(|e| format!("Error executing interactive process: {e}"))?; + tokio::select! { + _ = session.cancelled() => { + // The Session was cancelled: attempt to kill the process group / process, and + // then wait for it to exit (to avoid zombies). + if let Err(e) = subprocess.attempt_shutdown_sync() { + // Failed to kill the PGID: try the non-group form. + log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\ + This is unexpected: please file an issue about this problem at \ + [https://github.com/pantsbuild/pants/issues/new]", e); + subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {e}")).await?; + }; + subprocess.wait().await.map_err(|e| e.to_string()) + } + exit_status = subprocess.wait() => { + // The process exited. + exit_status.map_err(|e| e.to_string()) + } } - exit_status = subprocess.wait() => { - // The process exited. - exit_status.map_err(|e| e.to_string()) - } - } - }) - .await?; + }) + .await?; let code = exit_status.code().unwrap_or(-1); if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {