diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a6a7d947..7701fb74 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,6 +42,9 @@ jobs: - name: Build Release run: cargo build --verbose --release + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: @@ -71,6 +74,9 @@ jobs: - name: Run tests run: cargo test --workspace --verbose + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: @@ -100,6 +106,9 @@ jobs: - name: Run tests run: cargo test --workspace --verbose + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: diff --git a/ark-cli/Cargo.toml b/ark-cli/Cargo.toml index 08655206..ea661d3d 100644 --- a/ark-cli/Cargo.toml +++ b/ark-cli/Cargo.toml @@ -18,6 +18,7 @@ serde_json = "1.0.82" chrono = "0.4.34" anyhow = "1.0.80" thiserror = "1.0.57" +futures = "0.3" # REGISTRAR log = { version = "0.4.17", features = ["release_max_level_off"] } @@ -25,7 +26,7 @@ lazy_static = "1.4.0" canonical-path = "2.0.2" -fs-index = { path = "../fs-index" } +fs-index = { path = "../fs-index", features = ["watch"] } fs-atomic-versions = { path = "../fs-atomic-versions" } fs-metadata = { path = "../fs-metadata" } fs-properties = { path = "../fs-properties" } diff --git a/ark-cli/USAGE.md b/ark-cli/USAGE.md index 9a77906e..867b5e4e 100644 --- a/ark-cli/USAGE.md +++ b/ark-cli/USAGE.md @@ -136,6 +136,16 @@ $ /tmp/ark-cli list -t --filter=search 22-207093268 search,engine ``` +### Watch a Directory for Changes + +You can watch a directory for changes and automatically update the index by running the following command: + +```sh +ark-cli watch [PATH] +``` + +If you don't provide a path, the current directory (`.`) will be used by default. This command continuously monitors the specified directory for file changes (create, modify, or remove) and updates the index accordingly. It's useful for keeping your index in sync with the latest changes in the folder. + ## :zap: Low-level utilities :zap: There are commands which could be useful with time, when you grasp the basic concepts. Some of these commands also can be useful for debugging [ArkLib](https://github.com/ARK-Builders/ark-rust). diff --git a/ark-cli/src/commands/mod.rs b/ark-cli/src/commands/mod.rs index eab0cc1a..631c3ec6 100644 --- a/ark-cli/src/commands/mod.rs +++ b/ark-cli/src/commands/mod.rs @@ -8,6 +8,7 @@ mod list; mod monitor; mod render; pub mod storage; +mod watch; pub use file::{file_append, file_insert, format_file, format_line}; @@ -18,6 +19,7 @@ pub enum Commands { Monitor(monitor::Monitor), Render(render::Render), List(list::List), + Watch(watch::Watch), #[command(about = "Manage links")] Link { #[clap(subcommand)] diff --git a/ark-cli/src/commands/watch.rs b/ark-cli/src/commands/watch.rs new file mode 100644 index 00000000..6ef1d1e8 --- /dev/null +++ b/ark-cli/src/commands/watch.rs @@ -0,0 +1,74 @@ +use std::path::PathBuf; + +use futures::{pin_mut, StreamExt}; + +use fs_index::{watch_index, WatchEvent}; + +use crate::{AppError, DateTime, ResourceId, Utc}; + +#[derive(Clone, Debug, clap::Args)] +#[clap( + name = "watch", + about = "Watch the ark managed folder for changes and update the index accordingly" +)] +pub struct Watch { + #[clap( + help = "Path to the directory to watch for changes", + default_value = ".", + value_parser + )] + path: PathBuf, +} + +impl Watch { + pub async fn run(&self) -> Result<(), AppError> { + let stream = watch_index::<_, ResourceId>(&self.path); + pin_mut!(stream); + + while let Some(value) = stream.next().await { + match value { + WatchEvent::UpdatedOne(update) => { + println!("Index updated with a single file change"); + + let added = update.added(); + let removed = update.removed(); + for file in added { + let time_stamped_path = file.1.iter().next().unwrap(); + let file_path = time_stamped_path.item(); + let last_modified = time_stamped_path.last_modified(); + let last_modified: DateTime = last_modified.into(); + println!( + "\tAdded file: {:?} (last modified: {})", + file_path, + last_modified.format("%d/%m/%Y %T") + ); + } + for file in removed { + println!("\tRemoved file with hash: {:?}", file); + } + } + WatchEvent::UpdatedAll(update) => { + println!("Index fully updated"); + + let added = update.added(); + let removed = update.removed(); + + for file in added { + let time_stamped_path = file.1.iter().next().unwrap(); + let file_path = time_stamped_path.item(); + let last_modified = time_stamped_path.last_modified(); + println!( + "\tAdded file: {:?} (last modified: {:?})", + file_path, last_modified + ); + } + for file in removed { + println!("\tRemoved file with hash: {:?}", file); + } + } + } + } + + Ok(()) + } +} diff --git a/ark-cli/src/main.rs b/ark-cli/src/main.rs index e8029c18..7158df00 100644 --- a/ark-cli/src/main.rs +++ b/ark-cli/src/main.rs @@ -71,6 +71,7 @@ async fn run() -> Result<()> { Monitor(monitor) => monitor.run()?, Render(render) => render.run()?, List(list) => list.run()?, + Watch(watch) => watch.run().await?, Link { subcommand } => match subcommand { Create(create) => create.run().await?, Load(load) => load.run()?, diff --git a/ark-cli/src/util.rs b/ark-cli/src/util.rs index 40eff290..c5db3536 100644 --- a/ark-cli/src/util.rs +++ b/ark-cli/src/util.rs @@ -179,10 +179,9 @@ pub fn translate_storage( root: &Option, storage: &str, ) -> Option<(PathBuf, Option)> { - if let Ok(path) = PathBuf::from_str(storage) { - if path.exists() && path.is_dir() { - return Some((path, None)); - } + let Ok(path) = PathBuf::from_str(storage); + if path.exists() && path.is_dir() { + return Some((path, None)); } match storage.to_lowercase().as_str() { diff --git a/data-resource/src/lib.rs b/data-resource/src/lib.rs index ea7426c3..9368101c 100644 --- a/data-resource/src/lib.rs +++ b/data-resource/src/lib.rs @@ -25,6 +25,8 @@ pub trait ResourceId: + Hash + Serialize + DeserializeOwned + + Sync + + Send { /// Computes the resource identifier from the given file path fn from_path>(file_path: P) -> Result; diff --git a/fs-index/Cargo.toml b/fs-index/Cargo.toml index 0ded09dd..03393d49 100644 --- a/fs-index/Cargo.toml +++ b/fs-index/Cargo.toml @@ -14,6 +14,12 @@ walkdir = "2.3.2" anyhow = "1.0.58" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } +# For the watch API +notify = { version = "6.1", optional = true } +notify-debouncer-full = { version = "0.3", optional = true } +futures = { version = "0.3", optional = true } +async-stream = { version = "0.3", optional = true } +tokio = { version = "1.40", features = ["full"], optional = true } fs-storage = { path = "../fs-storage" } @@ -21,8 +27,13 @@ fs-storage = { path = "../fs-storage" } data-error = { path = "../data-error" } data-resource = { path = "../data-resource" } + +[features] +watch = ["notify", "notify-debouncer-full", "futures", "async-stream", "tokio"] + [dev-dependencies] uuid = { version = "1.6.1", features = ["v4"] } +env_logger = "0.11" # benchmarking criterion = { version = "0.5", features = ["html_reports"] } tempfile = "3.10" diff --git a/fs-index/README.md b/fs-index/README.md index ca3fb565..427db943 100644 --- a/fs-index/README.md +++ b/fs-index/README.md @@ -15,6 +15,10 @@ The most important struct in this crate is `ResourceIndex` which comes with: - `get_resource_by_path`: Query a resource from the index by its path. - **Selective API** - `update_one`: Method to manually update a specific resource by selectively rescanning a single file. +- **Watch API** (Enable with `watch` feature) + - `watch`: Method to watch a directory for changes and update the index accordingly. + +> **Note:** To see the watch API in action, run the `index_watch` example or check `ark-cli watch` command. ## Custom Serialization @@ -30,8 +34,14 @@ The `ResourceIndex` struct includes a custom serialization implementation to avo To get started, take a look at the examples in the `examples/` directory. -To run a specific example: +To run a specific example, run this command from the root of the project or the root of the crate: + +```shell +cargo run --example +``` + +For example, to run the `index_watch` example: ```shell -cargo run --example resource_index +cargo run --example index_watch ``` diff --git a/fs-index/examples/index_watch.rs b/fs-index/examples/index_watch.rs new file mode 100644 index 00000000..ef01c5b8 --- /dev/null +++ b/fs-index/examples/index_watch.rs @@ -0,0 +1,35 @@ +use std::path::Path; + +use anyhow::Result; +use futures::{pin_mut, StreamExt}; + +use dev_hash::Blake3; +use fs_index::{watch_index, WatchEvent}; + +/// A simple example of using `watch_index` to monitor a directory for file +/// changes. This asynchronously listens for updates and prints the paths of +/// changed files. +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + // Change this to the path of the directory you want to watch + let root = Path::new("test-assets"); + + let stream = watch_index::<_, Blake3>(root); + + pin_mut!(stream); // needed for iteration + + while let Some(value) = stream.next().await { + match value { + WatchEvent::UpdatedOne(update) => { + println!("Updated file: {:?}", update); + } + WatchEvent::UpdatedAll(update) => { + println!("Updated all: {:?}", update); + } + } + } + + Ok(()) +} diff --git a/fs-index/src/index.rs b/fs-index/src/index.rs index 6746ac29..5cc2aa1f 100644 --- a/fs-index/src/index.rs +++ b/fs-index/src/index.rs @@ -65,6 +65,17 @@ pub struct Timestamped { pub(crate) last_modified: SystemTime, } +impl Timestamped { + pub fn item(&self) -> &Item { + &self.item + } + + /// Return the last modified time + pub fn last_modified(&self) -> SystemTime { + self.last_modified + } +} + type IndexedPaths = HashSet>; /// Represents the index of resources in a directory. @@ -486,15 +497,22 @@ impl ResourceIndex { /// - The index is up-to-date with the file system except for the updated /// resource /// - In case of a addition, the resource was not already in the index - /// - In case of a modification or removal, the resource was already in the - /// index + /// - In case of a removal, the resource was already in the index + /// - In case of a move or rename, `update_one()` should be called twice: + /// once with the old path to remove the previous entry, and once with the + /// new path to add the updated entry pub fn update_one>( &mut self, relative_path: P, - ) -> Result<()> { + ) -> Result> { let path = relative_path.as_ref(); let entry_path = self.root.join(path); + let mut result = IndexUpdate { + added: HashMap::new(), + removed: HashSet::new(), + }; + // Check if the entry exists in the file system if !entry_path.exists() { // If the entry does not exist in the file system, it's a removal @@ -505,16 +523,18 @@ impl ResourceIndex { "Caller must ensure that the resource exists in the index: {:?}", path ); - let id = self.path_to_id.remove(path).unwrap(); + let id = self.path_to_id.remove(path).ok_or_else( + || anyhow::anyhow!("Resource with path {} is neither in the index nor in the file system. Make sure the index is up-to-date.", path.display()) + )?; self.id_to_paths .get_mut(&id.item) - .unwrap() + .expect("Resource ID not found in the ID to paths map") .remove(path); // If the ID has no paths, remove it from the ID to paths map if self.id_to_paths[&id.item].is_empty() { self.id_to_paths.remove(&id.item); + result.removed.insert(id.item); } - log::trace!("Resource removed: {:?}", path); } else { // If the entry exists in the file system, it's an addition or @@ -534,7 +554,7 @@ impl ResourceIndex { if let Some(prev_id) = self.path_to_id.get(path) { self.id_to_paths .get_mut(&prev_id.item) - .unwrap() + .expect("Resource ID not found in the ID to paths map") .remove(path); } @@ -547,9 +567,16 @@ impl ResourceIndex { .or_default() .insert(path.to_path_buf()); + let timpestamped_path = Timestamped { + item: path.to_path_buf(), + last_modified, + }; + result + .added + .insert(id, HashSet::from([timpestamped_path])); log::trace!("Resource added/updated: {:?}", path); } - Ok(()) + Ok(result) } } diff --git a/fs-index/src/lib.rs b/fs-index/src/lib.rs index 59030cef..ab86fe00 100644 --- a/fs-index/src/lib.rs +++ b/fs-index/src/lib.rs @@ -1,10 +1,13 @@ mod index; mod serde; mod utils; +#[cfg(feature = "watch")] +mod watch; +pub use index::{IndexUpdate, ResourceIndex}; pub use utils::load_or_build_index; - -pub use index::ResourceIndex; +#[cfg(feature = "watch")] +pub use watch::{watch_index, WatchEvent}; #[cfg(test)] mod tests; diff --git a/fs-index/src/tests.rs b/fs-index/src/tests.rs index 24f4defa..e4685f2d 100644 --- a/fs-index/src/tests.rs +++ b/fs-index/src/tests.rs @@ -810,7 +810,12 @@ fn test_track_removal_with_collision() { fs::remove_file(&file_path).expect("Failed to remove file"); - index.update_one("file1.txt").expect("Failed to update index"); + let result = index.update_one("file1.txt").expect("Failed to update index"); + + // Assert that `update_one` result is empty + // Rational: There is still a file with the same content as file1.txt so the + // resource was not removed. + assert_eq!(result.removed().len(), 0, "{:?}", result); assert_eq!(index.len(), 1, "{:?}", index); let resource_by_path = index @@ -908,3 +913,46 @@ fn test_track_modification_with_collision() { assert_eq!(index.collisions().len(), 1, "{:?}", index); }); } + +/// Test for calling `update_one()` on a file that was moved from the root +/// directory to a subdirectory. +/// +/// ## Test scenario: +/// - Create a file within the temporary directory. +/// - Build a resource index in the temporary directory. +/// - Move the file to a subdirectory. +/// - Call `update_one()` 2 times with the relative path of the moved file. +/// - Assert that the index contains the expected number of entries with the +/// correct IDs and paths after the move. +#[test] +fn test_track_move_to_subdirectory() { + for_each_type!(Crc32, Blake3 => { + let temp_dir = TempDir::with_prefix("ark_test_track_move_to_subdirectory") + .expect("Failed to create temp dir"); + let root_path = temp_dir.path(); + + let file_path = root_path.join("file.txt"); + fs::write(&file_path, "file content").expect("Failed to write to file"); + let file_id = Id::from_path(&file_path).expect("Failed to get checksum"); + + let mut index: ResourceIndex = + ResourceIndex::build(root_path).expect("Failed to build index"); + + let subdirectory_path = root_path.join("subdirectory"); + fs::create_dir(&subdirectory_path).expect("Failed to create subdirectory"); + + let moved_file_path = subdirectory_path.join("file.txt"); + fs::rename(&file_path, &moved_file_path).expect("Failed to move file"); + + // We need to call `update_one()` 2 times because the file was moved to a + // subdirectory. + index.update_one("file.txt").expect("Failed to update index"); + index.update_one("subdirectory/file.txt").expect("Failed to update index"); + + assert_eq!(index.len(), 1, "{:?}", index); + let resource_by_path = index + .get_resource_by_path("subdirectory/file.txt") + .expect("Failed to get resource"); + assert_eq!(*resource_by_path.id(), file_id); + }); +} diff --git a/fs-index/src/watch.rs b/fs-index/src/watch.rs new file mode 100644 index 00000000..6ed970e3 --- /dev/null +++ b/fs-index/src/watch.rs @@ -0,0 +1,169 @@ +use std::{fs, path::Path, thread, time::Duration}; + +use async_stream::stream; +use futures::Stream; +use notify::{RecursiveMode, Watcher}; +use notify_debouncer_full::new_debouncer; +use tokio::sync::mpsc; + +use data_resource::ResourceId; +use fs_storage::ARK_FOLDER; + +use crate::{IndexUpdate, ResourceIndex}; + +/// Represents the different kinds of events that can occur when watching the +/// resource index. +#[derive(Debug)] +pub enum WatchEvent { + /// Represents an update to a single resource. + UpdatedOne(IndexUpdate), + /// Represents an update to all resources. + UpdatedAll(IndexUpdate), +} + +/// Watches for file system changes and emits events related to the +/// [ResourceIndex]. +/// +/// This function sets up a file watcher that monitors a specified root path for +/// changes to files. It sends events such as file creations, modifications, +/// renames, and deletions through an asynchronous stream. The function uses a +/// debouncer to ensure that multiple rapid events are collapsed into a single +/// event. +pub fn watch_index, Id: ResourceId + 'static>( + root_path: P, +) -> impl Stream> { + log::debug!( + "Attempting to watch index at root path: {:?}", + root_path.as_ref() + ); + + let root_path = fs::canonicalize(root_path.as_ref()).unwrap(); + let mut index: ResourceIndex = + ResourceIndex::build(&root_path).unwrap(); + index.store().unwrap(); + + let (tx, mut rx) = mpsc::channel(100); + let ark_folder = root_path.join(ARK_FOLDER); + + // We need to spawn a new thread to run the blocking file system watcher + thread::spawn(move || { + // Setup the synchronous channel (notify debouncer expects this) + let (sync_tx, sync_rx) = std::sync::mpsc::channel(); + + let mut debouncer = + new_debouncer(Duration::from_secs(2), None, sync_tx).unwrap(); + let watcher = debouncer.watcher(); + watcher + .watch(&root_path, RecursiveMode::Recursive) + .unwrap(); + log::info!("Started debouncer file system watcher for {:?}", root_path); + + while let Ok(events) = sync_rx.recv() { + let events = match events { + Ok(evts) => evts, + Err(errs) => { + for err in errs { + log::error!("Error receiving event: {:?}", err); + } + continue; + } + }; + + // Send events to the async channel + for event in events { + log::trace!("Received event: {:?}", event); + + // If the event is a change in the .ark folder, ignore it + if event + .paths + .iter() + .any(|p| p.starts_with(&ark_folder)) + { + continue; + } + + let event_kind = event.event.kind; + // We only care for: + // - file modifications + // - file renames + // - file creations + // - file deletions + match event_kind { + notify::EventKind::Modify( + notify::event::ModifyKind::Data(_), + ) + | notify::EventKind::Modify( + notify::event::ModifyKind::Name(_), + ) + // On macOS, we noticed that force deleting a file + // triggers a metadata change event for some reason + | notify::EventKind::Modify( + notify::event::ModifyKind::Metadata(notify::event::MetadataKind::Any), + ) + | notify::EventKind::Create( + notify::event::CreateKind::File, + ) + | notify::EventKind::Remove( + notify::event::RemoveKind::File, + ) => {} + _ => continue, + } + + let watch_event: WatchEvent = if event.need_rescan() { + log::info!("Detected rescan event: {:?}", event); + match index.update_all() { + Ok(update_result) => { + WatchEvent::UpdatedAll(update_result) + } + Err(e) => { + log::error!("Failed to update all: {:?}", e); + continue; + } + } + } else { + // Update the index for the specific file + let file = event + .paths + .first() + .expect("Failed to get file path from event"); + + let relative_path = match file.strip_prefix(&root_path) { + Ok(path) => path, + Err(e) => { + log::error!("Failed to get relative path: {:?}", e); + continue; + } + }; + + match index.update_one(relative_path) { + Ok(update_result) => { + WatchEvent::UpdatedOne(update_result) + } + Err(e) => { + log::error!("Failed to update one: {:?}", e); + continue; + } + } + }; + + if let Err(e) = index.store() { + log::error!("Failed to store index: {:?}", e); + } + + // Use blocking send to the async channel because we are in a + // separate thread + if tx.blocking_send(watch_event).is_err() { + log::error!("Failed to send event to async channel"); + break; + } + } + } + }); + + // Create an async stream that reads from the receiver + stream! { + while let Some(event) = rx.recv().await { + yield event; + } + } +} diff --git a/integration/ark-cli-watch.sh b/integration/ark-cli-watch.sh new file mode 100755 index 00000000..58d935f8 --- /dev/null +++ b/integration/ark-cli-watch.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +# Set up paths +WATCH_DIR="test-assets" +OUTPUT_FILE="ark-watch-output.txt" +INDEX_FILE="$WATCH_DIR/.ark/index" +ARK_CLI="./target/release/ark-cli" + +# Function to check the index file content +check_index() { + # Expecting a certain number of resources based on the operations done + expected_count=$1 + shift + expected_resources=("$@") + + # Get the actual count of resources in the index + resources_count=$(jq '.resources | keys | length' "$INDEX_FILE") + + if [ "$resources_count" -ne "$expected_count" ]; then + echo "Index sanity check failed: expected $expected_count resources, found $resources_count" + exit 1 + fi + + # Check the paths of the resources in the index + for resource in "${expected_resources[@]}"; do + if ! jq -e ".resources | has(\"$resource\")" "$INDEX_FILE" > /dev/null; then + echo "Index sanity check failed: resource \"$resource\" not found in index." + exit 1 + fi + done + + echo "Current resources in index:" + jq '.resources' "$INDEX_FILE" +} + +# Start `ark-cli watch` in the background and capture output +echo "Starting ark-cli watch on $WATCH_DIR..." +$ARK_CLI watch "$WATCH_DIR" > "$OUTPUT_FILE" & +WATCH_PID=$! +sleep 1 # Wait a bit to ensure the watch command is up + +# Initial sanity check for index file +check_index 2 "test.pdf" "lena.jpg" # Initially should contain lena.jpg and test.pdf + +echo "Modifying files in $WATCH_DIR..." + +# Step 1: Copy `lena.jpg` to `lena_copy.jpg` +cp "$WATCH_DIR/lena.jpg" "$WATCH_DIR/lena_copy.jpg" +sleep 3 + +check_index 3 "lena.jpg" "lena_copy.jpg" "test.pdf" + +# Step 2: Remove `test.pdf` +rm "$WATCH_DIR/test.pdf" +sleep 3 + +check_index 2 "lena.jpg" "lena_copy.jpg" + +# Step 3: Create a new empty file `note.txt` +touch "$WATCH_DIR/note.txt" +sleep 3 + +# Final index check after all operations +echo "Verifying final index state..." +check_index 3 "lena.jpg" "lena_copy.jpg" "note.txt" # Expect three resources now + +# Allow `ark-cli watch` time to process and then kill it +sleep 1 +kill $WATCH_PID + +# Wait briefly for output to complete +wait $WATCH_PID 2>/dev/null + +# Read and verify the ark-watch-output.txt contents +echo "Checking ark-cli watch output..." +expected_change_count=3 # Three file changes done +actual_change_count=$(grep -c "Index updated with a single file change" "$OUTPUT_FILE") + +if [ "$actual_change_count" -ne "$expected_change_count" ]; then + echo "Output verification failed: expected $expected_change_count updates, found $actual_change_count" + exit 1 +fi + +echo "All checks passed successfully!"