Skip to content

Commit

Permalink
perform invalidation directly on writes (vercel/turborepo#5786)
Browse files Browse the repository at this point in the history
### Description

This changes the `write` function in the filesystem to invalidate the
path directly instead of going through the watcher. This makes it easier
to find write conflicts as they will directly hang the compilation.
Tracing can be used to print out the cycle of writes.


Closes WEB-1427
  • Loading branch information
sokra authored Aug 25, 2023
1 parent d3811fa commit c1646bd
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 8 deletions.
39 changes: 39 additions & 0 deletions crates/turbo-tasks-fs/src/invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,42 @@ impl InvalidationReasonKind for WatchStartKind {
)
}
}

/// Invalidation was caused by a write operation on the filesystem
#[derive(PartialEq, Eq, Hash)]
pub struct Write {
pub path: String,
}

impl InvalidationReason for Write {
fn kind(&self) -> Option<StaticOrArc<dyn InvalidationReasonKind>> {
Some(StaticOrArc::Static(&WRITE_KIND))
}
}

impl Display for Write {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{} written", self.path)
}
}

/// Invalidation kind for [Write]
#[derive(PartialEq, Eq, Hash)]
struct WriteKind;

static WRITE_KIND: WriteKind = WriteKind;

impl InvalidationReasonKind for WriteKind {
fn fmt(
&self,
reasons: &IndexSet<StaticOrArc<dyn InvalidationReason>>,
f: &mut Formatter<'_>,
) -> std::fmt::Result {
write!(
f,
"{} files written ({}, ...)",
reasons.len(),
reasons[0].as_any().downcast_ref::<Write>().unwrap().path
)
}
}
60 changes: 54 additions & 6 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ use turbo_tasks_hash::hash_xxh3_hash64;
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};
pub use virtual_fs::VirtualFileSystem;

use self::{invalidation::WatchStart, json::UnparseableJson, mutex_map::MutexMap};
use self::{
invalidation::{WatchStart, Write},
json::UnparseableJson,
mutex_map::MutexMap,
};
use crate::{
attach::AttachedFileSystem,
invalidation::WatchChange,
Expand Down Expand Up @@ -200,6 +204,20 @@ impl DiskFileSystem {
Ok(())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function. It removes and returns
/// the current list of invalidators.
fn register_sole_invalidator(&self, path: &Path) -> Result<HashSet<Invalidator>> {
let invalidator = turbo_tasks::get_invalidator();
let mut invalidator_map = self.invalidator_map.lock().unwrap();
let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into());
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
}
Ok(old_invalidators.unwrap_or_default())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_dir_invalidator(&self, path: &Path) -> Result<()> {
Expand Down Expand Up @@ -487,14 +505,33 @@ impl DiskFileSystem {
path.join(&*unix_to_sys(&fs_path.path))
})
}

fn invalidate_from_write(&self, full_path: &Path, invalidators: HashSet<Invalidator>) {
if !invalidators.is_empty() {
if let Some(path) = format_absolute_fs_path(full_path, &self.name, self.root_path()) {
if invalidators.len() == 1 {
let invalidator = invalidators.into_iter().next().unwrap();
invalidator.invalidate_with_reason(Write { path });
} else {
invalidators.into_iter().for_each(|invalidator| {
invalidator.invalidate_with_reason(Write { path: path.clone() });
});
}
} else {
invalidators.into_iter().for_each(|invalidator| {
invalidator.invalidate();
});
}
}
}
}

struct PathLockGuard<'a>(
RwLockReadGuard<'a, ()>,
mutex_map::MutexMapGuard<'a, PathBuf>,
);

fn format_absolute_fs_path(path: &Path, name: &str, root_path: &PathBuf) -> Option<String> {
fn format_absolute_fs_path(path: &Path, name: &str, root_path: &Path) -> Option<String> {
let path = if let Ok(rel_path) = path.strip_prefix(root_path) {
let path = if MAIN_SEPARATOR != '/' {
let rel_path = rel_path.to_string_lossy().replace(MAIN_SEPARATOR, "/");
Expand Down Expand Up @@ -710,22 +747,29 @@ impl FileSystem for DiskFileSystem {
let full_path = self.to_sys_path(fs_path).await?;
let content = content.await?;

// Track the file, so that we will rewrite it if it ever changes.
fs_path.track().await?;

let _lock = self.lock_path(&full_path).await;

// Track the file, so that we will rewrite it if it ever changes.
let old_invalidators = self.register_sole_invalidator(&full_path)?;

// We perform an untracked comparison here, so that this write is not dependent
// on a read's Vc<FileContent> (and the memory it holds). Our untracked read can
// be freed immediately. Given this is an output file, it's unlikely any Turbo
// code will need to read the file from disk into a Vc<FileContent>, so we're
// not wasting cycles.
let compare = content.streaming_compare(full_path.clone()).await?;
if compare == FileComparison::Equal {
if !old_invalidators.is_empty() {
let key = path_to_key(&full_path);
for i in old_invalidators {
self.invalidator_map.insert(key.clone(), i);
}
}
return Ok(Completion::unchanged());
}

let create_directory = compare == FileComparison::Create;

match &*content {
FileContent::Content(file) => {
if create_directory {
Expand Down Expand Up @@ -769,6 +813,8 @@ impl FileSystem for DiskFileSystem {
}
}

self.invalidate_from_write(&full_path, old_invalidators);

Ok(Completion::new())
}

Expand All @@ -779,6 +825,8 @@ impl FileSystem for DiskFileSystem {
target: Vc<LinkContent>,
) -> Result<Vc<Completion>> {
let full_path = self.to_sys_path(fs_path).await?;
// TODO(sokra) preform a untracked read here, register an invalidator and get
// all existing invalidators
let old_content = fs_path
.read_link()
.await
Expand Down Expand Up @@ -834,7 +882,7 @@ impl FileSystem for DiskFileSystem {
return Err(anyhow!("invalid symlink target: {}", full_path.display()));
}
LinkContent::NotFound => {
retry_future(|| fs::remove_file(full_path.clone()))
retry_future(|| fs::remove_file(&full_path))
.await
.or_else(|err| {
if err.kind() == ErrorKind::NotFound {
Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks-fs/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use tokio::task::spawn_blocking;

const MAX_RETRY_ATTEMPTS: usize = 10;

pub(crate) async fn retry_future<R, F, Fut>(func: F) -> io::Result<R>
pub(crate) async fn retry_future<'a, R, F, Fut>(func: F) -> io::Result<R>
where
F: FnMut() -> Fut + Unpin,
Fut: Future<Output = io::Result<R>>,
Fut: Future<Output = io::Result<R>> + 'a,
{
match FutureRetry::new(
func,
Expand Down

0 comments on commit c1646bd

Please sign in to comment.