Skip to content

Commit

Permalink
fsmonitor: initial Watchman Fsmonitor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
arxanas committed Jun 11, 2022
1 parent 8078577 commit 4eb4d0e
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 33 deletions.
343 changes: 339 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ regex = "1.5.6"
serde_json = "1.0.81"
tempfile = "3.3.0"
thiserror = "1.0.31"
tokio = "1.18.2"
uuid = { version = "1.1.1", features = ["v4"] }
whoami = "1.2.1"
zstd = "0.11.2"
watchman_client = "0.7.2"

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
114 changes: 114 additions & 0 deletions lib/src/fsmonitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Interfaces with a filesystem monitor tool (currently Watchman) to
//! efficiently query for filesystem updates, without having to crawl the entire
//! working copy. This is particularly useful for large working copies, or for
//! working copies for which it's expensive to materialize files, such those
//! backed by a network or virtualized filesystem.
#![warn(missing_docs)]

use std::path::{Path, PathBuf};
use std::sync::Arc;

use itertools::Itertools;
use thiserror::Error;
use watchman_client::prelude::{NameOnly, QueryRequestCommon, QueryResult};

/// Represents an instance in time from the perspective of the filesystem
/// monitor.
///
/// This can be used to perform incremental queries. A given query will return
/// the associated clock. By passing the same clock into a future query, this
/// informs the filesystem monitor that we only wish to get changed files since
/// the previous point in time.
#[derive(Clone, Debug)]
pub struct FsmonitorClock(watchman_client::pdu::Clock);

#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum FsmonitorError {
#[error("Could not connect to Watchman: {0}")]
WatchmanConnectError(watchman_client::Error),

#[error("Could not canonicalize working copy root path: {0}")]
CanonicalizeRootError(std::io::Error),

#[error("Watchman failed to resolve the working copy root path: {0}")]
ResolveRootError(watchman_client::Error),

#[error("Failed to query Watchman: {0}")]
WatchmanQueryError(watchman_client::Error),
}

/// Handle to the underlying filesystem monitor (currently Watchman).
#[derive(Clone)]
pub struct Fsmonitor {
client: Arc<watchman_client::Client>,
resolved_root: watchman_client::ResolvedRoot,
}

impl Fsmonitor {
/// Initialize the filesystem monitor. If it's not already running, this
/// will start it and have it crawl the working copy to build up its
/// in-memory representation of the filesystem, which may take some time.
pub async fn init(working_copy_path: &Path) -> Result<Self, FsmonitorError> {
println!("Querying filesystem monitor (Watchman)...");
let connector = watchman_client::Connector::new();
let client = connector
.connect()
.await
.map_err(FsmonitorError::WatchmanConnectError)?;
let working_copy_root = watchman_client::CanonicalPath::canonicalize(working_copy_path)
.map_err(FsmonitorError::CanonicalizeRootError)?;
let resolved_root = client
.resolve_root(working_copy_root)
.await
.map_err(FsmonitorError::ResolveRootError)?;
Ok(Self {
client: Arc::new(client),
resolved_root,
})
}

/// Query for changed files since the previous point in time.
pub async fn query_changed_files(
&self,
_previous_clock: Option<FsmonitorClock>,
) -> Result<(FsmonitorClock, Option<Vec<PathBuf>>), FsmonitorError> {
let QueryResult {
version: _,
is_fresh_instance,
files,
clock,
state_enter: _,
state_leave: _,
state_metadata: _,
saved_state_info: _,
debug: _,
}: QueryResult<NameOnly> = self
.client
.query(
&self.resolved_root,
QueryRequestCommon {
..Default::default()
},
)
.await
.map_err(FsmonitorError::WatchmanQueryError)?;

let clock = FsmonitorClock(clock);
if is_fresh_instance {
// The Watchman documentation states that if it was a fresh
// instance, we need to delete any tree entries that didn't appear
// in the returned list of changed files. For now, the caller will
// handle this by manually crawling the working copy again.
Ok((clock, None))
} else {
let paths: Vec<PathBuf> = files
.unwrap_or_default()
.into_iter()
.map(|file_info| file_info.name.into_inner())
.collect_vec();
Ok((clock, Some(paths)))
}
}
}
1 change: 1 addition & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod dag_walk;
pub mod diff;
pub mod file_util;
pub mod files;
pub mod fsmonitor;
pub mod git;
pub mod git_backend;
pub mod gitignore;
Expand Down
6 changes: 6 additions & 0 deletions lib/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ impl UserSettings {
.unwrap_or_else(|_| Self::user_email_placeholder().to_string())
}

pub fn use_fsmonitor(&self) -> bool {
self.config
.get_bool("core.use-fsmonitor")
.unwrap_or_default()
}

pub fn user_email_placeholder() -> &'static str {
"(no email configured)"
}
Expand Down
111 changes: 101 additions & 10 deletions lib/src/working_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::backend::{
BackendError, ConflictId, FileId, MillisSinceEpoch, SymlinkId, TreeId, TreeValue,
};
use crate::conflicts::{materialize_conflict, update_conflict_from_content};
use crate::fsmonitor::{Fsmonitor, FsmonitorClock, FsmonitorError};
use crate::gitignore::GitIgnoreFile;
use crate::lock::FileLock;
use crate::matchers::{DifferenceMatcher, Matcher, PrefixMatcher};
Expand Down Expand Up @@ -207,6 +208,10 @@ pub struct CheckoutStats {

#[derive(Debug, Error)]
pub enum SnapshotError {
#[error("Failed to open file {path}: {err:?}")]
FileOpenError { path: PathBuf, err: std::io::Error },
#[error("Failed to query the filesystem monitor: {0}")]
FsmonitorError(#[from] FsmonitorError),
#[error("{message}: {err}")]
IoError {
message: String,
Expand Down Expand Up @@ -391,18 +396,92 @@ impl TreeState {
Ok(self.store.write_symlink(path, str_target)?)
}

#[tokio::main]
async fn query_fsmonitor(
&self,
) -> Result<(FsmonitorClock, Option<Vec<PathBuf>>), FsmonitorError> {
let working_copy_path = self.working_copy_path.to_owned();
tokio::spawn(async move {
let fsmonitor = Fsmonitor::init(&working_copy_path).await?;
let previous_clock = None;
fsmonitor.query_changed_files(previous_clock).await
})
.await
.unwrap()
}

/// Look for changes to the working copy. If there are any changes, create
/// a new tree from it and return it, and also update the dirstate on disk.
pub fn snapshot(&mut self, base_ignores: Arc<GitIgnoreFile>) -> Result<TreeId, SnapshotError> {
pub fn snapshot(
&mut self,
base_ignores: Arc<GitIgnoreFile>,
should_use_fsmonitor: bool,
) -> Result<TreeId, SnapshotError> {
let (_fsmonitor_clock, changed_files) = if should_use_fsmonitor {
match self.query_fsmonitor() {
Ok((fsmonitor_clock, changed_files)) => (Some(fsmonitor_clock), changed_files),
Err(err) => {
eprintln!("Failed to query fsmonitor: {}", err);
(None, None)
}
}
} else {
(None, None)
};

let sparse_matcher = self.sparse_matcher();
let mut work = vec![(
RepoPath::root(),
self.working_copy_path.clone(),
base_ignores,
)];
let mut tree_builder = self.store.tree_builder(self.tree_id.clone());
let mut deleted_files: HashSet<_> = self.file_states.keys().cloned().collect();
while let Some((dir, disk_dir, git_ignore)) = work.pop() {

struct WorkItem {
dir: RepoPath,
disk_dir: PathBuf,
git_ignore: Arc<GitIgnoreFile>,
should_recurse: bool,
}
let repo_root = RepoPath::root();
let repo_root_pathbuf = repo_root.to_fs_path(&self.working_copy_path);
let mut work: Vec<WorkItem> = match changed_files {
Some(changed_files) => changed_files
.into_iter()
.map(|path| {
let repo_path_components = path
.iter()
.map(|component| component.to_str().expect("Non-UTF-8 path component"))
.map(RepoPathComponent::from)
.collect();
let dir = RepoPath::from_components(repo_path_components);
WorkItem {
dir,
disk_dir: repo_root_pathbuf.join(path),

// FIXME: it's possible that we missed a `.gitignore`
// which belongs to a parent directory of the given
// file.
git_ignore: base_ignores.clone(),

should_recurse: false,
}
})
.collect(),

None => {
vec![WorkItem {
dir: repo_root,
disk_dir: self.working_copy_path.clone(),
git_ignore: base_ignores,
should_recurse: true,
}]
}
};

while let Some(WorkItem {
dir,
disk_dir,
git_ignore,
should_recurse,
}) = work.pop()
{
if sparse_matcher.visit(&dir).is_nothing() {
continue;
}
Expand All @@ -429,7 +508,15 @@ impl TreeState {
{
continue;
}
work.push((sub_path, entry.path(), git_ignore.clone()));

if should_recurse {
work.push(WorkItem {
dir: sub_path,
disk_dir: entry.path(),
git_ignore: git_ignore.clone(),
should_recurse: true,
});
}
} else {
deleted_files.remove(&sub_path);
if sparse_matcher.matches(&sub_path) {
Expand Down Expand Up @@ -1085,12 +1172,16 @@ impl LockedWorkingCopy<'_> {
// The base_ignores are passed in here rather than being set on the TreeState
// because the TreeState may be long-lived if the library is used in a
// long-lived process.
pub fn snapshot(&mut self, base_ignores: Arc<GitIgnoreFile>) -> Result<TreeId, SnapshotError> {
pub fn snapshot(
&mut self,
base_ignores: Arc<GitIgnoreFile>,
should_use_fsmonitor: bool,
) -> Result<TreeId, SnapshotError> {
self.wc
.tree_state()
.as_mut()
.unwrap()
.snapshot(base_ignores)
.snapshot(base_ignores, should_use_fsmonitor)
}

pub fn check_out(&mut self, new_tree: &Tree) -> Result<CheckoutStats, CheckoutError> {
Expand Down
Loading

0 comments on commit 4eb4d0e

Please sign in to comment.