Skip to content

Commit

Permalink
🎨 add SIGBUS startup failure warning message (#34)
Browse files Browse the repository at this point in the history
If we encounter a signal 7 or a SIGBUS signal error from the death of a
shard, we then log a warning recommending to add shared memory.

Signed-off-by: Prashant Gupta <[email protected]>
Co-authored-by: PRASHANT GUPTA <[email protected]>
  • Loading branch information
joerunde and prashantgupta24 authored Feb 21, 2024
1 parent 1f4cfbe commit 6c56f8f
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions launcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use nix::unistd::Pid;
use std::env;
use std::io::{BufRead, BufReader, ErrorKind, Write};
use std::path::Path;
use std::process::{Command, ExitCode, Stdio};
use std::process::{Command, ExitCode, ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::TryRecvError;
use std::sync::Arc;
Expand All @@ -16,7 +16,7 @@ use std::{fs, io};
use std::env::VarError;
use std::ffi::OsString;
use std::fs::File;
use std::os::unix::process::CommandExt;
use std::os::unix::process::{CommandExt, ExitStatusExt};
use tracing::{info, warn};

// In most cases this gives the best performance for inferencing
Expand Down Expand Up @@ -238,7 +238,7 @@ fn main() -> ExitCode {
Err(TryRecvError::Empty) => {
sleep(Duration::from_millis(100));
}
Ok(ShardStatus::Failed) => {
Ok(ShardStatus::Failed(_status)) => {
shutdown_shards(shutdown, shutdown_receiver);
return ExitCode::FAILURE;
}
Expand Down Expand Up @@ -347,9 +347,17 @@ fn main() -> ExitCode {
let mut exit_code = ExitCode::SUCCESS;

while running.load(Ordering::SeqCst) {
if let Ok(ShardStatus::Failed) = status_receiver.try_recv() {
if let Ok(ShardStatus::Failed(status)) = status_receiver.try_recv() {
exit_code = ExitCode::FAILURE;
break;
terminate_gracefully(&mut webserver, shutdown.clone(), shutdown_receiver);
if status.signal() == Some(7) && num_shard > 1 {
panic!(
"Encountered SIGBUS error. This is usually caused by NCCL having insufficient shared memory. \
Ensure at least 1GB of shared memory is available. In case of OpenShift/K8s, \
mount a memory medium emptyDir volume to /dev/shm"
)
}
return exit_code
};

match webserver.try_wait().expect("Error polling status of router process") {
Expand All @@ -362,17 +370,21 @@ fn main() -> ExitCode {
};
}

// Graceful termination
terminate_gracefully(&mut webserver, shutdown.clone(), shutdown_receiver);

exit_code
}

/// Graceful termination
fn terminate_gracefully(webserver: &mut std::process::Child, shutdown: Arc<Mutex<bool>>, shutdown_receiver: &mpsc::Receiver<()>) {
signal::kill(Pid::from_raw(webserver.id() as i32), Signal::SIGTERM).unwrap();
info!("Waiting for router to gracefully shutdown");
webserver.wait().unwrap();
info!("Router terminated");
shutdown_shards(shutdown, &shutdown_receiver);

exit_code
}


fn num_cuda_devices() -> Option<usize> {
let devices = match env::var("CUDA_VISIBLE_DEVICES") {
Ok(devices) => devices,
Expand Down Expand Up @@ -481,7 +493,7 @@ fn find_num_shards(num_shard: Option<usize>) -> usize {
#[derive(Debug)]
enum ShardStatus {
Ready,
Failed,
Failed(ExitStatus),
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -619,7 +631,7 @@ fn shard_manager(
} else {
tracing::error!("Shard {rank} failed to start:\n{err}");
}
status_sender.send(ShardStatus::Failed).unwrap();
status_sender.send(ShardStatus::Failed(ExitStatus::from_raw(0))).unwrap();
return
}
};
Expand Down Expand Up @@ -654,7 +666,9 @@ fn shard_manager(
io::stdout().flush().unwrap_or_default();
stderr_thread.join().unwrap_or_default();
io::stderr().flush().unwrap_or_default();
status_sender.send(ShardStatus::Failed).unwrap();
status_sender
.send(ShardStatus::Failed(status))
.unwrap();
}
return
}
Expand Down

0 comments on commit 6c56f8f

Please sign in to comment.