diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a6a7d947..7701fb74 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,6 +42,9 @@ jobs: - name: Build Release run: cargo build --verbose --release + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: @@ -71,6 +74,9 @@ jobs: - name: Run tests run: cargo test --workspace --verbose + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: @@ -100,6 +106,9 @@ jobs: - name: Run tests run: cargo test --workspace --verbose + - name: Run `ark-cli watch` test + run: ./integration/ark-cli-watch.sh + - name: Install JDK uses: actions/setup-java@v4.2.1 with: diff --git a/ark-cli/Cargo.toml b/ark-cli/Cargo.toml index 08655206..ea661d3d 100644 --- a/ark-cli/Cargo.toml +++ b/ark-cli/Cargo.toml @@ -18,6 +18,7 @@ serde_json = "1.0.82" chrono = "0.4.34" anyhow = "1.0.80" thiserror = "1.0.57" +futures = "0.3" # REGISTRAR log = { version = "0.4.17", features = ["release_max_level_off"] } @@ -25,7 +26,7 @@ lazy_static = "1.4.0" canonical-path = "2.0.2" -fs-index = { path = "../fs-index" } +fs-index = { path = "../fs-index", features = ["watch"] } fs-atomic-versions = { path = "../fs-atomic-versions" } fs-metadata = { path = "../fs-metadata" } fs-properties = { path = "../fs-properties" } diff --git a/ark-cli/USAGE.md b/ark-cli/USAGE.md index 9a77906e..867b5e4e 100644 --- a/ark-cli/USAGE.md +++ b/ark-cli/USAGE.md @@ -136,6 +136,16 @@ $ /tmp/ark-cli list -t --filter=search 22-207093268 search,engine ``` +### Watch a Directory for Changes + +You can watch a directory for changes and automatically update the index by running the following command: + +```sh +ark-cli watch [PATH] +``` + +If you don't provide a path, the current directory (`.`) will be used by default. This command continuously monitors the specified directory for file changes (create, modify, or remove) and updates the index accordingly. It's useful for keeping your index in sync with the latest changes in the folder. + ## :zap: Low-level utilities :zap: There are commands which could be useful with time, when you grasp the basic concepts. Some of these commands also can be useful for debugging [ArkLib](https://github.com/ARK-Builders/ark-rust). diff --git a/ark-cli/src/commands/mod.rs b/ark-cli/src/commands/mod.rs index eab0cc1a..631c3ec6 100644 --- a/ark-cli/src/commands/mod.rs +++ b/ark-cli/src/commands/mod.rs @@ -8,6 +8,7 @@ mod list; mod monitor; mod render; pub mod storage; +mod watch; pub use file::{file_append, file_insert, format_file, format_line}; @@ -18,6 +19,7 @@ pub enum Commands { Monitor(monitor::Monitor), Render(render::Render), List(list::List), + Watch(watch::Watch), #[command(about = "Manage links")] Link { #[clap(subcommand)] diff --git a/ark-cli/src/commands/watch.rs b/ark-cli/src/commands/watch.rs new file mode 100644 index 00000000..6ef1d1e8 --- /dev/null +++ b/ark-cli/src/commands/watch.rs @@ -0,0 +1,74 @@ +use std::path::PathBuf; + +use futures::{pin_mut, StreamExt}; + +use fs_index::{watch_index, WatchEvent}; + +use crate::{AppError, DateTime, ResourceId, Utc}; + +#[derive(Clone, Debug, clap::Args)] +#[clap( + name = "watch", + about = "Watch the ark managed folder for changes and update the index accordingly" +)] +pub struct Watch { + #[clap( + help = "Path to the directory to watch for changes", + default_value = ".", + value_parser + )] + path: PathBuf, +} + +impl Watch { + pub async fn run(&self) -> Result<(), AppError> { + let stream = watch_index::<_, ResourceId>(&self.path); + pin_mut!(stream); + + while let Some(value) = stream.next().await { + match value { + WatchEvent::UpdatedOne(update) => { + println!("Index updated with a single file change"); + + let added = update.added(); + let removed = update.removed(); + for file in added { + let time_stamped_path = file.1.iter().next().unwrap(); + let file_path = time_stamped_path.item(); + let last_modified = time_stamped_path.last_modified(); + let last_modified: DateTime = last_modified.into(); + println!( + "\tAdded file: {:?} (last modified: {})", + file_path, + last_modified.format("%d/%m/%Y %T") + ); + } + for file in removed { + println!("\tRemoved file with hash: {:?}", file); + } + } + WatchEvent::UpdatedAll(update) => { + println!("Index fully updated"); + + let added = update.added(); + let removed = update.removed(); + + for file in added { + let time_stamped_path = file.1.iter().next().unwrap(); + let file_path = time_stamped_path.item(); + let last_modified = time_stamped_path.last_modified(); + println!( + "\tAdded file: {:?} (last modified: {:?})", + file_path, last_modified + ); + } + for file in removed { + println!("\tRemoved file with hash: {:?}", file); + } + } + } + } + + Ok(()) + } +} diff --git a/ark-cli/src/main.rs b/ark-cli/src/main.rs index e8029c18..7158df00 100644 --- a/ark-cli/src/main.rs +++ b/ark-cli/src/main.rs @@ -71,6 +71,7 @@ async fn run() -> Result<()> { Monitor(monitor) => monitor.run()?, Render(render) => render.run()?, List(list) => list.run()?, + Watch(watch) => watch.run().await?, Link { subcommand } => match subcommand { Create(create) => create.run().await?, Load(load) => load.run()?, diff --git a/data-resource/src/lib.rs b/data-resource/src/lib.rs index ea7426c3..9368101c 100644 --- a/data-resource/src/lib.rs +++ b/data-resource/src/lib.rs @@ -25,6 +25,8 @@ pub trait ResourceId: + Hash + Serialize + DeserializeOwned + + Sync + + Send { /// Computes the resource identifier from the given file path fn from_path>(file_path: P) -> Result; diff --git a/fs-index/Cargo.toml b/fs-index/Cargo.toml index 0ded09dd..4057f8aa 100644 --- a/fs-index/Cargo.toml +++ b/fs-index/Cargo.toml @@ -14,6 +14,12 @@ walkdir = "2.3.2" anyhow = "1.0.58" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } +# For the watch API +notify = { version = "6.1", optional = true } +notify-debouncer-full = { version = "0.3", optional = true } +futures = { version = "0.3", optional = true } +async-stream = { version = "0.3", optional = true } +tokio = { version = "1.40", features = ["full"], optional = true } fs-storage = { path = "../fs-storage" } @@ -21,6 +27,10 @@ fs-storage = { path = "../fs-storage" } data-error = { path = "../data-error" } data-resource = { path = "../data-resource" } + +[features] +watch = ["notify", "notify-debouncer-full", "futures", "async-stream", "tokio"] + [dev-dependencies] uuid = { version = "1.6.1", features = ["v4"] } # benchmarking diff --git a/fs-index/README.md b/fs-index/README.md index ca3fb565..427db943 100644 --- a/fs-index/README.md +++ b/fs-index/README.md @@ -15,6 +15,10 @@ The most important struct in this crate is `ResourceIndex` which comes with: - `get_resource_by_path`: Query a resource from the index by its path. - **Selective API** - `update_one`: Method to manually update a specific resource by selectively rescanning a single file. +- **Watch API** (Enable with `watch` feature) + - `watch`: Method to watch a directory for changes and update the index accordingly. + +> **Note:** To see the watch API in action, run the `index_watch` example or check `ark-cli watch` command. ## Custom Serialization @@ -30,8 +34,14 @@ The `ResourceIndex` struct includes a custom serialization implementation to avo To get started, take a look at the examples in the `examples/` directory. -To run a specific example: +To run a specific example, run this command from the root of the project or the root of the crate: + +```shell +cargo run --example +``` + +For example, to run the `index_watch` example: ```shell -cargo run --example resource_index +cargo run --example index_watch ``` diff --git a/fs-index/examples/index_watch.rs b/fs-index/examples/index_watch.rs new file mode 100644 index 00000000..f105a3eb --- /dev/null +++ b/fs-index/examples/index_watch.rs @@ -0,0 +1,33 @@ +use std::path::Path; + +use anyhow::Result; +use futures::{pin_mut, StreamExt}; + +use dev_hash::Blake3; +use fs_index::{watch_index, WatchEvent}; + +/// A simple example of using `watch_index` to monitor a directory for file +/// changes. This asynchronously listens for updates and prints the paths of +/// changed files. +#[tokio::main] +async fn main() -> Result<()> { + // Change this to the path of the directory you want to watch + let root = Path::new("test-assets"); + + let stream = watch_index::<_, Blake3>(root); + + pin_mut!(stream); // needed for iteration + + while let Some(value) = stream.next().await { + match value { + WatchEvent::UpdatedOne(update) => { + println!("Updated file: {:?}", update); + } + WatchEvent::UpdatedAll(update) => { + println!("Updated all: {:?}", update); + } + } + } + + Ok(()) +} diff --git a/fs-index/src/index.rs b/fs-index/src/index.rs index 6746ac29..e50eab82 100644 --- a/fs-index/src/index.rs +++ b/fs-index/src/index.rs @@ -65,6 +65,17 @@ pub struct Timestamped { pub(crate) last_modified: SystemTime, } +impl Timestamped { + pub fn item(&self) -> &Item { + &self.item + } + + /// Return the last modified time + pub fn last_modified(&self) -> SystemTime { + self.last_modified + } +} + type IndexedPaths = HashSet>; /// Represents the index of resources in a directory. @@ -491,10 +502,15 @@ impl ResourceIndex { pub fn update_one>( &mut self, relative_path: P, - ) -> Result<()> { + ) -> Result> { let path = relative_path.as_ref(); let entry_path = self.root.join(path); + let mut result = IndexUpdate { + added: HashMap::new(), + removed: HashSet::new(), + }; + // Check if the entry exists in the file system if !entry_path.exists() { // If the entry does not exist in the file system, it's a removal @@ -513,8 +529,8 @@ impl ResourceIndex { // If the ID has no paths, remove it from the ID to paths map if self.id_to_paths[&id.item].is_empty() { self.id_to_paths.remove(&id.item); + result.removed.insert(id.item); } - log::trace!("Resource removed: {:?}", path); } else { // If the entry exists in the file system, it's an addition or @@ -547,9 +563,16 @@ impl ResourceIndex { .or_default() .insert(path.to_path_buf()); + let timpestamped_path = Timestamped { + item: path.to_path_buf(), + last_modified, + }; + result + .added + .insert(id, HashSet::from([timpestamped_path])); log::trace!("Resource added/updated: {:?}", path); } - Ok(()) + Ok(result) } } diff --git a/fs-index/src/lib.rs b/fs-index/src/lib.rs index 59030cef..ab86fe00 100644 --- a/fs-index/src/lib.rs +++ b/fs-index/src/lib.rs @@ -1,10 +1,13 @@ mod index; mod serde; mod utils; +#[cfg(feature = "watch")] +mod watch; +pub use index::{IndexUpdate, ResourceIndex}; pub use utils::load_or_build_index; - -pub use index::ResourceIndex; +#[cfg(feature = "watch")] +pub use watch::{watch_index, WatchEvent}; #[cfg(test)] mod tests; diff --git a/fs-index/src/watch.rs b/fs-index/src/watch.rs new file mode 100644 index 00000000..6ed970e3 --- /dev/null +++ b/fs-index/src/watch.rs @@ -0,0 +1,169 @@ +use std::{fs, path::Path, thread, time::Duration}; + +use async_stream::stream; +use futures::Stream; +use notify::{RecursiveMode, Watcher}; +use notify_debouncer_full::new_debouncer; +use tokio::sync::mpsc; + +use data_resource::ResourceId; +use fs_storage::ARK_FOLDER; + +use crate::{IndexUpdate, ResourceIndex}; + +/// Represents the different kinds of events that can occur when watching the +/// resource index. +#[derive(Debug)] +pub enum WatchEvent { + /// Represents an update to a single resource. + UpdatedOne(IndexUpdate), + /// Represents an update to all resources. + UpdatedAll(IndexUpdate), +} + +/// Watches for file system changes and emits events related to the +/// [ResourceIndex]. +/// +/// This function sets up a file watcher that monitors a specified root path for +/// changes to files. It sends events such as file creations, modifications, +/// renames, and deletions through an asynchronous stream. The function uses a +/// debouncer to ensure that multiple rapid events are collapsed into a single +/// event. +pub fn watch_index, Id: ResourceId + 'static>( + root_path: P, +) -> impl Stream> { + log::debug!( + "Attempting to watch index at root path: {:?}", + root_path.as_ref() + ); + + let root_path = fs::canonicalize(root_path.as_ref()).unwrap(); + let mut index: ResourceIndex = + ResourceIndex::build(&root_path).unwrap(); + index.store().unwrap(); + + let (tx, mut rx) = mpsc::channel(100); + let ark_folder = root_path.join(ARK_FOLDER); + + // We need to spawn a new thread to run the blocking file system watcher + thread::spawn(move || { + // Setup the synchronous channel (notify debouncer expects this) + let (sync_tx, sync_rx) = std::sync::mpsc::channel(); + + let mut debouncer = + new_debouncer(Duration::from_secs(2), None, sync_tx).unwrap(); + let watcher = debouncer.watcher(); + watcher + .watch(&root_path, RecursiveMode::Recursive) + .unwrap(); + log::info!("Started debouncer file system watcher for {:?}", root_path); + + while let Ok(events) = sync_rx.recv() { + let events = match events { + Ok(evts) => evts, + Err(errs) => { + for err in errs { + log::error!("Error receiving event: {:?}", err); + } + continue; + } + }; + + // Send events to the async channel + for event in events { + log::trace!("Received event: {:?}", event); + + // If the event is a change in the .ark folder, ignore it + if event + .paths + .iter() + .any(|p| p.starts_with(&ark_folder)) + { + continue; + } + + let event_kind = event.event.kind; + // We only care for: + // - file modifications + // - file renames + // - file creations + // - file deletions + match event_kind { + notify::EventKind::Modify( + notify::event::ModifyKind::Data(_), + ) + | notify::EventKind::Modify( + notify::event::ModifyKind::Name(_), + ) + // On macOS, we noticed that force deleting a file + // triggers a metadata change event for some reason + | notify::EventKind::Modify( + notify::event::ModifyKind::Metadata(notify::event::MetadataKind::Any), + ) + | notify::EventKind::Create( + notify::event::CreateKind::File, + ) + | notify::EventKind::Remove( + notify::event::RemoveKind::File, + ) => {} + _ => continue, + } + + let watch_event: WatchEvent = if event.need_rescan() { + log::info!("Detected rescan event: {:?}", event); + match index.update_all() { + Ok(update_result) => { + WatchEvent::UpdatedAll(update_result) + } + Err(e) => { + log::error!("Failed to update all: {:?}", e); + continue; + } + } + } else { + // Update the index for the specific file + let file = event + .paths + .first() + .expect("Failed to get file path from event"); + + let relative_path = match file.strip_prefix(&root_path) { + Ok(path) => path, + Err(e) => { + log::error!("Failed to get relative path: {:?}", e); + continue; + } + }; + + match index.update_one(relative_path) { + Ok(update_result) => { + WatchEvent::UpdatedOne(update_result) + } + Err(e) => { + log::error!("Failed to update one: {:?}", e); + continue; + } + } + }; + + if let Err(e) = index.store() { + log::error!("Failed to store index: {:?}", e); + } + + // Use blocking send to the async channel because we are in a + // separate thread + if tx.blocking_send(watch_event).is_err() { + log::error!("Failed to send event to async channel"); + break; + } + } + } + }); + + // Create an async stream that reads from the receiver + stream! { + while let Some(event) = rx.recv().await { + yield event; + } + } +} diff --git a/integration/ark-cli-watch.sh b/integration/ark-cli-watch.sh new file mode 100755 index 00000000..58d935f8 --- /dev/null +++ b/integration/ark-cli-watch.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +# Set up paths +WATCH_DIR="test-assets" +OUTPUT_FILE="ark-watch-output.txt" +INDEX_FILE="$WATCH_DIR/.ark/index" +ARK_CLI="./target/release/ark-cli" + +# Function to check the index file content +check_index() { + # Expecting a certain number of resources based on the operations done + expected_count=$1 + shift + expected_resources=("$@") + + # Get the actual count of resources in the index + resources_count=$(jq '.resources | keys | length' "$INDEX_FILE") + + if [ "$resources_count" -ne "$expected_count" ]; then + echo "Index sanity check failed: expected $expected_count resources, found $resources_count" + exit 1 + fi + + # Check the paths of the resources in the index + for resource in "${expected_resources[@]}"; do + if ! jq -e ".resources | has(\"$resource\")" "$INDEX_FILE" > /dev/null; then + echo "Index sanity check failed: resource \"$resource\" not found in index." + exit 1 + fi + done + + echo "Current resources in index:" + jq '.resources' "$INDEX_FILE" +} + +# Start `ark-cli watch` in the background and capture output +echo "Starting ark-cli watch on $WATCH_DIR..." +$ARK_CLI watch "$WATCH_DIR" > "$OUTPUT_FILE" & +WATCH_PID=$! +sleep 1 # Wait a bit to ensure the watch command is up + +# Initial sanity check for index file +check_index 2 "test.pdf" "lena.jpg" # Initially should contain lena.jpg and test.pdf + +echo "Modifying files in $WATCH_DIR..." + +# Step 1: Copy `lena.jpg` to `lena_copy.jpg` +cp "$WATCH_DIR/lena.jpg" "$WATCH_DIR/lena_copy.jpg" +sleep 3 + +check_index 3 "lena.jpg" "lena_copy.jpg" "test.pdf" + +# Step 2: Remove `test.pdf` +rm "$WATCH_DIR/test.pdf" +sleep 3 + +check_index 2 "lena.jpg" "lena_copy.jpg" + +# Step 3: Create a new empty file `note.txt` +touch "$WATCH_DIR/note.txt" +sleep 3 + +# Final index check after all operations +echo "Verifying final index state..." +check_index 3 "lena.jpg" "lena_copy.jpg" "note.txt" # Expect three resources now + +# Allow `ark-cli watch` time to process and then kill it +sleep 1 +kill $WATCH_PID + +# Wait briefly for output to complete +wait $WATCH_PID 2>/dev/null + +# Read and verify the ark-watch-output.txt contents +echo "Checking ark-cli watch output..." +expected_change_count=3 # Three file changes done +actual_change_count=$(grep -c "Index updated with a single file change" "$OUTPUT_FILE") + +if [ "$actual_change_count" -ne "$expected_change_count" ]; then + echo "Output verification failed: expected $expected_change_count updates, found $actual_change_count" + exit 1 +fi + +echo "All checks passed successfully!"