Skip to content

Commit

Permalink
Merge pull request #1111 from spkenv/spfs-fuse-heartbeat
Browse files Browse the repository at this point in the history
Add a heartbeat between spfs-monitor and spfs-fuse
  • Loading branch information
jrray authored Sep 3, 2024
2 parents 8329333 + 7f961cb commit 97ab65a
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 20 deletions.
97 changes: 84 additions & 13 deletions crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
// SPDX-License-Identifier: Apache-2.0
// https://github.com/spkenv/spk

use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
use fuser::MountOption;
use miette::{bail, miette, Context, IntoDiagnostic, Result};
use spfs::tracking::EnvSpec;
use spfs::Error;
use spfs_cli_common as cli;
use spfs_cli_common::{self as cli, warn_and_sentry_event};
use spfs_vfs::{Config, Session};
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::timeout;

// The runtime setup process manages the current namespace
// which operates only on the current thread. For this reason
Expand Down Expand Up @@ -198,13 +202,10 @@ impl CmdFuse {
// introspected and unmounted by other parts of spfs such as the monitor
tracing::debug!("Establishing fuse session...");
let mount_opts = opts.mount_options.iter().cloned().collect::<Vec<_>>();
let mut session = fuser::Session::new(
Session::new(self.reference.clone(), opts.clone()),
&mountpoint,
&mount_opts,
)
.into_diagnostic()
.wrap_err("Failed to create a FUSE session")?;
let session = Session::new(self.reference.clone(), opts.clone());
let mut fuser_session = fuser::Session::new(session.clone(), &mountpoint, &mount_opts)
.into_diagnostic()
.wrap_err("Failed to create a FUSE session")?;

if opts.gid != calling_gid {
nix::unistd::setgid(opts.gid)
Expand Down Expand Up @@ -250,16 +251,45 @@ impl CmdFuse {
let mut interrupt = signal(SignalKind::interrupt()).into_diagnostic().wrap_err("interrupt signal handler")?;
let mut quit = signal(SignalKind::quit()).into_diagnostic().wrap_err("quit signal handler")?;
let mut terminate = signal(SignalKind::terminate()).into_diagnostic().wrap_err("terminate signal handler")?;
let (heartbeat_send, mut heartbeat_recv) = tokio::sync::mpsc::channel(1);

tracing::info!("Starting FUSE filesystem");
// Although the filesystem could run in the current thread, we prefer to
// create a blocking future that can move into tokio and be managed/scheduled
// as desired, otherwise this thread will block and may affect the runtime
// operation unpredictably
let join_handle = tokio::task::spawn_blocking(move || session.run());
let abort_handle = join_handle.abort_handle();
let unmount_callable = Arc::new(std::sync::Mutex::new(fuser_session.unmount_callable()));
let mut join_handle = tokio::task::spawn_blocking(move || fuser_session.run());

let mut heartbeat_monitor = config.fuse.enable_heartbeat.then(|| {
let heartbeat_interval_seconds = config.fuse.heartbeat_interval_seconds.get();
let heartbeat_grace_period_seconds = config.fuse.heartbeat_grace_period_seconds.get();
// Don't move the [only] sender or if heartbeats are not
// enabled it will be dropped and trigger the receiving end.
let heartbeat_send = heartbeat_send.clone();
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30.min((heartbeat_interval_seconds / 2).max(1))));

loop {
interval.tick().await;
let seconds = session.seconds_since_last_heartbeat();
tracing::trace!(seconds_since_last_heartbeat = ?seconds, "heartbeat monitor");
if seconds > heartbeat_grace_period_seconds {
warn_and_sentry_event!("loss of heartbeat, shutting down filesystem");

// XXX: Calling unmount here has no apparent effect!
heartbeat_send.send(()).await.into_diagnostic().wrap_err("Failed to send unmount signal")?;
break;
}
}

Ok::<_, miette::Report>(())
})
});

let mut heartbeat_failed = false;
let res = tokio::select!{
res = join_handle => {
res = &mut join_handle => {
tracing::info!("Filesystem shutting down");
res.into_diagnostic().wrap_err("FUSE session failed")
}
Expand All @@ -268,11 +298,52 @@ impl CmdFuse {
_ = terminate.recv() => Err(miette!("Terminate signal received, filesystem shutting down")),
_ = interrupt.recv() => Err(miette!("Interrupt signal received, filesystem shutting down")),
_ = quit.recv() => Err(miette!("Quit signal received, filesystem shutting down")),
_ = heartbeat_recv.recv() => {
heartbeat_failed = true;
Err(miette!("Heartbeat monitor triggered, filesystem shutting down"))
}
};
// the filesystem task must be fully terminated in order for the subsequent unmount

if let Some(handle) = heartbeat_monitor.take() {
handle.abort_handle().abort();
if let Err(err) = handle.await {
tracing::warn!("Heartbeat monitor failed: {err:?}");
}
}

// The filesystem task must be fully terminated in order for the subsequent unmount
// process to function. Otherwise, the background task will keep this process alive
// forever.
abort_handle.abort();
//
// Exception: if spfs-monitor died without cleaning up the runtime,
// fuser::Session::run will not exit and there's no way to tell it
// to break out of its loop. When the fuse filesystem is included
// in the lowerdir of an overlayfs, even though `fusermount -u`
// appears to succeed, and the mount disappears from /etc/mtab,
// spfs-fuse stays alive and still functions when accessed via the
// overlayfs. As a last-ditch effort to avoid leaving a spfs-fuse
// process running forever, we will return without attempting any
// cleanup.
if heartbeat_failed {
return res;
}
if !join_handle.is_finished() {
// XXX: Calling unmount has no apparent effect!
unmount_callable.lock().unwrap().unmount().into_diagnostic().wrap_err("FUSE unmount failed")?;
tracing::trace!("Joining FUSE session");
// Since the umount above may have no effect, this join uses a
// timeout so the process doesn't deadlock. Once this process
// exits, the fusermount auto_unmount should kick in.
match timeout(Duration::from_secs(5), join_handle).await {
Ok(r) => {
tracing::trace!("FUSE session joined");
r.into_diagnostic().wrap_err("FUSE join_handle await failed")?.into_diagnostic().wrap_err("FUSE session failed after unmount")?;
}
Err(_) => {
tracing::warn!("FUSE session join timed out");
}
}
}
res
});

Expand Down
6 changes: 3 additions & 3 deletions crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl CmdMonitor {
.build()
.into_diagnostic()
.wrap_err("Failed to establish async runtime")?;
let code = rt.block_on(self.run_async())?;
let code = rt.block_on(self.run_async(config))?;
// the monitor is running in the background and, although not expected,
// can take extra time to shutdown if needed
rt.shutdown_timeout(std::time::Duration::from_secs(5));
Expand Down Expand Up @@ -141,7 +141,7 @@ impl CmdMonitor {
}
}

pub async fn run_async(&mut self) -> Result<i32> {
pub async fn run_async(&mut self, config: &spfs::Config) -> Result<i32> {
let mut interrupt = signal(SignalKind::interrupt())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
let mut quit = signal(SignalKind::quit())
Expand All @@ -157,7 +157,7 @@ impl CmdMonitor {
let mut owned = spfs::runtime::OwnedRuntime::upgrade_as_monitor(runtime).await?;
tracing::trace!("upgraded to owned runtime, waiting for empty runtime");

let fut = spfs::monitor::wait_for_empty_runtime(&owned);
let fut = spfs::monitor::wait_for_empty_runtime(&owned, config);
let res = tokio::select! {
res = fut => {
tracing::info!("Monitor detected no more processes, cleaning up runtime...");
Expand Down
21 changes: 19 additions & 2 deletions crates/spfs-cli/common/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub struct Logging {
#[clap(skip)]
pub syslog: bool,

/// Enables timestamp in logging
/// Enables timestamp in logging (always enabled in file log)
#[clap(long, global = true, env = "SPFS_LOG_TIMESTAMP")]
pub timestamp: bool,
}
Expand Down Expand Up @@ -403,7 +403,11 @@ impl Logging {
})
.map(|log_file| {
let layer = fmt_layer().with_writer(log_file);
let layer = configure_timestamp!(layer, self.timestamp).with_filter(env_filter());
let layer = configure_timestamp!(layer, {
// file logs should always have a timestamp (fight me!)
true
})
.with_filter(env_filter());
without_sentry_target!(layer)
});

Expand All @@ -422,6 +426,19 @@ impl Logging {
}
}

/// Log a message at the warning level and also generate a sentry event if
/// sentry is enabled.
#[macro_export]
macro_rules! warn_and_sentry_event {
($($arg:tt)*) => {
#[cfg(feature = "sentry")]
{
tracing::error!(target: "sentry", $($arg)*);
}
tracing::warn!($($arg)*);
};
}

/// Command line flags for viewing annotations in a runtime
#[derive(Debug, Clone, clap::Args)]
pub struct AnnotationViewing {
Expand Down
33 changes: 33 additions & 0 deletions crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ffi::{OsStr, OsString};
use std::io::{Seek, SeekFrom};
use std::mem::ManuallyDrop;
use std::os::fd::{AsRawFd, FromRawFd};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::prelude::FileExt;
#[cfg(feature = "fuse-backend-abi-7-31")]
use std::pin::Pin;
Expand Down Expand Up @@ -686,6 +687,7 @@ impl Filesystem {
/// This implements the [`fuser::Filesystem`] trait, receives
/// all requests and arranges for their async execution in the
/// spfs virtual filesystem.
#[derive(Clone)]
pub struct Session {
inner: Arc<SessionInner>,
}
Expand All @@ -694,20 +696,35 @@ impl Session {
/// Construct a new session which serves the provided reference
/// in its filesystem
pub fn new(reference: EnvSpec, opts: Config) -> Self {
let session_start = tokio::time::Instant::now();

Self {
inner: Arc::new(SessionInner {
opts,
reference,
fs: tokio::sync::OnceCell::new(),
session_start,
last_heartbeat_seconds_since_session_start: AtomicU64::new(0),
}),
}
}

/// Return the number of seconds since the last heartbeat was received
pub fn seconds_since_last_heartbeat(&self) -> u64 {
self.inner.session_start.elapsed().as_secs()
- self
.inner
.last_heartbeat_seconds_since_session_start
.load(Ordering::Relaxed)
}
}

struct SessionInner {
opts: Config,
reference: EnvSpec,
fs: tokio::sync::OnceCell<Arc<Filesystem>>,
session_start: tokio::time::Instant,
last_heartbeat_seconds_since_session_start: AtomicU64,
}

impl SessionInner {
Expand Down Expand Up @@ -790,6 +807,22 @@ impl fuser::Filesystem for Session {
}

fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
if name
.as_bytes()
.starts_with(spfs::config::Fuse::HEARTBEAT_FILENAME_PREFIX.as_bytes())
{
let seconds_since_session_start = self.inner.session_start.elapsed().as_secs();
tracing::trace!(?seconds_since_session_start, "heard heartbeat");
self.inner
.last_heartbeat_seconds_since_session_start
.store(seconds_since_session_start, Ordering::Relaxed);

// The heartbeat filename is sufficiently unique that the reply can
// be sent without doing any real I/O on the backing filesystem.
reply.error(libc::ENOENT);
return;
}

let name = name.to_owned();
let session = Arc::clone(&self.inner);
tokio::task::spawn(async move {
Expand Down
34 changes: 33 additions & 1 deletion crates/spfs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
// https://github.com/spkenv/spk

use std::num::NonZeroUsize;
use std::num::{NonZeroU64, NonZeroUsize};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -57,6 +57,16 @@ const fn default_monitor_max_blocking_threads() -> NonZeroUsize {
unsafe { NonZeroUsize::new_unchecked(2) }
}

const fn default_fuse_heartbeat_interval_seconds() -> NonZeroU64 {
// Safety: this is a hard-coded non-zero value
unsafe { NonZeroU64::new_unchecked(60) }
}

const fn default_fuse_heartbeat_grace_period_seconds() -> NonZeroU64 {
// Safety: this is a hard-coded non-zero value
unsafe { NonZeroU64::new_unchecked(300) }
}

static CONFIG: OnceCell<RwLock<Arc<Config>>> = OnceCell::new();

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -382,13 +392,35 @@ pub struct Fuse {
pub worker_threads: NonZeroUsize,
#[serde(default = "default_fuse_max_blocking_threads")]
pub max_blocking_threads: NonZeroUsize,
/// Enable a heartbeat between spfs-monitor and spfs-fuse. If spfs-monitor
/// stops sending a heartbeat, spfs-fuse will shut down.
pub enable_heartbeat: bool,
/// How often to send a heartbeat, in seconds
#[serde(default = "default_fuse_heartbeat_interval_seconds")]
pub heartbeat_interval_seconds: NonZeroU64,
/// How long to allow not receiving a heartbeat before shutting down, in
/// seconds
#[serde(default = "default_fuse_heartbeat_grace_period_seconds")]
pub heartbeat_grace_period_seconds: NonZeroU64,
}

impl Fuse {
/// The prefix for the heartbeat file name when heartbeats are enabled.
/// This prefix contains a UUID so that the fuse backend can reasonably
/// assume any attempt to access a file with this prefix is a heartbeat and
/// does not need to process the related file I/O normally.
pub const HEARTBEAT_FILENAME_PREFIX: &'static str =
".spfs-heartbeat-436cd8d6-60d1-11ef-9c93-00155dab73c6-";
}

impl Default for Fuse {
fn default() -> Self {
Self {
worker_threads: default_fuse_worker_threads(),
max_blocking_threads: default_fuse_max_blocking_threads(),
enable_heartbeat: false,
heartbeat_interval_seconds: default_fuse_heartbeat_interval_seconds(),
heartbeat_grace_period_seconds: default_fuse_heartbeat_grace_period_seconds(),
}
}
}
Expand Down
23 changes: 22 additions & 1 deletion crates/spfs/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn spawn_monitor_for_runtime(rt: &runtime::Runtime) -> Result<tokio::process
///
/// This is a privileged operation that may fail with a permission
/// issue if the calling process is not root or CAP_NET_ADMIN
pub async fn wait_for_empty_runtime(rt: &runtime::Runtime) -> Result<()> {
pub async fn wait_for_empty_runtime(rt: &runtime::Runtime, config: &crate::Config) -> Result<()> {
let pid = match rt.status.owner {
None => return Err(Error::RuntimeNotInitialized(rt.name().into())),
Some(pid) => pid,
Expand Down Expand Up @@ -295,6 +295,12 @@ pub async fn wait_for_empty_runtime(rt: &runtime::Runtime) -> Result<()> {
const LOG_UPDATE_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(5);
let mut log_update_deadline = tokio::time::Instant::now() + LOG_UPDATE_INTERVAL;

let spfs_heartbeat_interval: tokio::time::Duration =
tokio::time::Duration::from_secs(config.fuse.heartbeat_interval_seconds.get());
let mut spfs_heartbeat_deadline = tokio::time::Instant::now() + spfs_heartbeat_interval;

let enable_heartbeat = config.fuse.enable_heartbeat && rt.is_backend_fuse();

while let Some(event) = events_stream.next().await {
let no_more_processes = tracked_processes.is_empty();

Expand All @@ -307,6 +313,21 @@ pub async fn wait_for_empty_runtime(rt: &runtime::Runtime) -> Result<()> {
log_update_deadline = now + LOG_UPDATE_INTERVAL;
}

if enable_heartbeat && now >= spfs_heartbeat_deadline {
// Tickle the spfs filesystem to let `spfs-fuse` know we're still
// alive. This is a read operation to avoid issues with ro mounts
// or modifying any content in /spfs.
// The filename has a unique component to avoid any caching.
let _ = tokio::fs::symlink_metadata(format!(
"/spfs/{}{}",
crate::config::Fuse::HEARTBEAT_FILENAME_PREFIX,
ulid::Ulid::new()
))
.await;

spfs_heartbeat_deadline = now + spfs_heartbeat_interval;
}

if no_more_processes {
// If the mount namespace is known, verify there really aren't any
// more processes in the namespace. Since the polling might not deliver
Expand Down
Loading

0 comments on commit 97ab65a

Please sign in to comment.