Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watch_index method and ark-cli watch command #36

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ jobs:
- name: Build Release
run: cargo build --verbose --release

- name: Run `ark-cli watch` test
run: ./integration/ark-cli-watch.sh

- name: Install JDK
uses: actions/[email protected]
with:
Expand Down Expand Up @@ -71,6 +74,9 @@ jobs:
- name: Run tests
run: cargo test --workspace --verbose

- name: Run `ark-cli watch` test
run: ./integration/ark-cli-watch.sh

- name: Install JDK
uses: actions/[email protected]
with:
Expand Down Expand Up @@ -100,6 +106,9 @@ jobs:
- name: Run tests
run: cargo test --workspace --verbose

- name: Run `ark-cli watch` test
run: ./integration/ark-cli-watch.sh

- name: Install JDK
uses: actions/[email protected]
with:
Expand Down
3 changes: 2 additions & 1 deletion ark-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ serde_json = "1.0.82"
chrono = "0.4.34"
anyhow = "1.0.80"
thiserror = "1.0.57"
futures = "0.3"

# REGISTRAR
log = { version = "0.4.17", features = ["release_max_level_off"] }
lazy_static = "1.4.0"
canonical-path = "2.0.2"


fs-index = { path = "../fs-index" }
fs-index = { path = "../fs-index", features = ["watch"] }
fs-atomic-versions = { path = "../fs-atomic-versions" }
fs-metadata = { path = "../fs-metadata" }
fs-properties = { path = "../fs-properties" }
Expand Down
10 changes: 10 additions & 0 deletions ark-cli/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ $ /tmp/ark-cli list -t --filter=search
22-207093268 search,engine
```

### Watch a Directory for Changes

You can watch a directory for changes and automatically update the index by running the following command:

```sh
ark-cli watch [PATH]
```

If you don't provide a path, the current directory (`.`) will be used by default. This command continuously monitors the specified directory for file changes (create, modify, or remove) and updates the index accordingly. It's useful for keeping your index in sync with the latest changes in the folder.

## :zap: Low-level utilities :zap:

There are commands which could be useful with time, when you grasp the basic concepts. Some of these commands also can be useful for debugging [ArkLib](https://github.com/ARK-Builders/ark-rust).
Expand Down
2 changes: 2 additions & 0 deletions ark-cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod list;
mod monitor;
mod render;
pub mod storage;
mod watch;

pub use file::{file_append, file_insert, format_file, format_line};

Expand All @@ -18,6 +19,7 @@ pub enum Commands {
Monitor(monitor::Monitor),
Render(render::Render),
List(list::List),
Watch(watch::Watch),
#[command(about = "Manage links")]
Link {
#[clap(subcommand)]
Expand Down
74 changes: 74 additions & 0 deletions ark-cli/src/commands/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::path::PathBuf;

use futures::{pin_mut, StreamExt};

use fs_index::{watch_index, WatchEvent};

use crate::{AppError, DateTime, ResourceId, Utc};

#[derive(Clone, Debug, clap::Args)]
#[clap(
name = "watch",
about = "Watch the ark managed folder for changes and update the index accordingly"
)]
pub struct Watch {
#[clap(
help = "Path to the directory to watch for changes",
default_value = ".",
value_parser
)]
path: PathBuf,
}

impl Watch {
pub async fn run(&self) -> Result<(), AppError> {
let stream = watch_index::<_, ResourceId>(&self.path);
pin_mut!(stream);

while let Some(value) = stream.next().await {
match value {
WatchEvent::UpdatedOne(update) => {
println!("Index updated with a single file change");

let added = update.added();
let removed = update.removed();
for file in added {
let time_stamped_path = file.1.iter().next().unwrap();
let file_path = time_stamped_path.item();
let last_modified = time_stamped_path.last_modified();
let last_modified: DateTime<Utc> = last_modified.into();
println!(
"\tAdded file: {:?} (last modified: {})",
file_path,
last_modified.format("%d/%m/%Y %T")
);
}
for file in removed {
println!("\tRemoved file with hash: {:?}", file);
}
}
WatchEvent::UpdatedAll(update) => {
println!("Index fully updated");

let added = update.added();
let removed = update.removed();

for file in added {
let time_stamped_path = file.1.iter().next().unwrap();
let file_path = time_stamped_path.item();
let last_modified = time_stamped_path.last_modified();
println!(
"\tAdded file: {:?} (last modified: {:?})",
file_path, last_modified
);
}
for file in removed {
println!("\tRemoved file with hash: {:?}", file);
}
}
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions ark-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async fn run() -> Result<()> {
Monitor(monitor) => monitor.run()?,
Render(render) => render.run()?,
List(list) => list.run()?,
Watch(watch) => watch.run().await?,
Link { subcommand } => match subcommand {
Create(create) => create.run().await?,
Load(load) => load.run()?,
Expand Down
7 changes: 3 additions & 4 deletions ark-cli/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,9 @@ pub fn translate_storage(
root: &Option<PathBuf>,
storage: &str,
) -> Option<(PathBuf, Option<StorageType>)> {
if let Ok(path) = PathBuf::from_str(storage) {
if path.exists() && path.is_dir() {
return Some((path, None));
}
let Ok(path) = PathBuf::from_str(storage);
if path.exists() && path.is_dir() {
return Some((path, None));
}

match storage.to_lowercase().as_str() {
Expand Down
2 changes: 2 additions & 0 deletions data-resource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub trait ResourceId:
+ Hash
+ Serialize
+ DeserializeOwned
+ Sync
+ Send
{
/// Computes the resource identifier from the given file path
fn from_path<P: AsRef<Path>>(file_path: P) -> Result<Self>;
Expand Down
11 changes: 11 additions & 0 deletions fs-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,26 @@ walkdir = "2.3.2"
anyhow = "1.0.58"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
# For the watch API
notify = { version = "6.1", optional = true }
notify-debouncer-full = { version = "0.3", optional = true }
futures = { version = "0.3", optional = true }
async-stream = { version = "0.3", optional = true }
tokio = { version = "1.40", features = ["full"], optional = true }


fs-storage = { path = "../fs-storage" }

data-error = { path = "../data-error" }
data-resource = { path = "../data-resource" }


[features]
watch = ["notify", "notify-debouncer-full", "futures", "async-stream", "tokio"]

[dev-dependencies]
uuid = { version = "1.6.1", features = ["v4"] }
env_logger = "0.11"
# benchmarking
criterion = { version = "0.5", features = ["html_reports"] }
tempfile = "3.10"
Expand Down
14 changes: 12 additions & 2 deletions fs-index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ The most important struct in this crate is `ResourceIndex` which comes with:
- `get_resource_by_path`: Query a resource from the index by its path.
- **Selective API**
- `update_one`: Method to manually update a specific resource by selectively rescanning a single file.
- **Watch API** (Enable with `watch` feature)
- `watch`: Method to watch a directory for changes and update the index accordingly.

> **Note:** To see the watch API in action, run the `index_watch` example or check `ark-cli watch` command.

## Custom Serialization

Expand All @@ -30,8 +34,14 @@ The `ResourceIndex` struct includes a custom serialization implementation to avo

To get started, take a look at the examples in the `examples/` directory.

To run a specific example:
To run a specific example, run this command from the root of the project or the root of the crate:

```shell
cargo run --example <example_name>
```

For example, to run the `index_watch` example:

```shell
cargo run --example resource_index
cargo run --example index_watch
```
35 changes: 35 additions & 0 deletions fs-index/examples/index_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::path::Path;

use anyhow::Result;
use futures::{pin_mut, StreamExt};

use dev_hash::Blake3;
use fs_index::{watch_index, WatchEvent};

/// A simple example of using `watch_index` to monitor a directory for file
/// changes. This asynchronously listens for updates and prints the paths of
/// changed files.
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

// Change this to the path of the directory you want to watch
let root = Path::new("test-assets");

let stream = watch_index::<_, Blake3>(root);

pin_mut!(stream); // needed for iteration

while let Some(value) = stream.next().await {
match value {
WatchEvent::UpdatedOne(update) => {
println!("Updated file: {:?}", update);
}
WatchEvent::UpdatedAll(update) => {
println!("Updated all: {:?}", update);
}
}
}

Ok(())
}
43 changes: 35 additions & 8 deletions fs-index/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ pub struct Timestamped<Item> {
pub(crate) last_modified: SystemTime,
}

impl<Item> Timestamped<Item> {
pub fn item(&self) -> &Item {
&self.item
}

/// Return the last modified time
pub fn last_modified(&self) -> SystemTime {
self.last_modified
}
}
tareknaser marked this conversation as resolved.
Show resolved Hide resolved

type IndexedPaths = HashSet<Timestamped<PathBuf>>;

/// Represents the index of resources in a directory.
Expand Down Expand Up @@ -486,15 +497,22 @@ impl<Id: ResourceId> ResourceIndex<Id> {
/// - The index is up-to-date with the file system except for the updated
/// resource
/// - In case of a addition, the resource was not already in the index
/// - In case of a modification or removal, the resource was already in the
/// index
/// - In case of a removal, the resource was already in the index
/// - In case of a move or rename, `update_one()` should be called twice:
/// once with the old path to remove the previous entry, and once with the
/// new path to add the updated entry
pub fn update_one<P: AsRef<Path>>(
kirillt marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
relative_path: P,
) -> Result<()> {
) -> Result<IndexUpdate<Id>> {
let path = relative_path.as_ref();
let entry_path = self.root.join(path);

let mut result = IndexUpdate {
added: HashMap::new(),
removed: HashSet::new(),
};

// Check if the entry exists in the file system
if !entry_path.exists() {
// If the entry does not exist in the file system, it's a removal
Expand All @@ -505,16 +523,18 @@ impl<Id: ResourceId> ResourceIndex<Id> {
"Caller must ensure that the resource exists in the index: {:?}",
path
);
let id = self.path_to_id.remove(path).unwrap();
let id = self.path_to_id.remove(path).ok_or_else(
|| anyhow::anyhow!("Resource with path {} is neither in the index nor in the file system. Make sure the index is up-to-date.", path.display())
)?;
self.id_to_paths
.get_mut(&id.item)
.unwrap()
.expect("Resource ID not found in the ID to paths map")
.remove(path);
// If the ID has no paths, remove it from the ID to paths map
if self.id_to_paths[&id.item].is_empty() {
self.id_to_paths.remove(&id.item);
result.removed.insert(id.item);
}

log::trace!("Resource removed: {:?}", path);
} else {
// If the entry exists in the file system, it's an addition or
Expand All @@ -534,7 +554,7 @@ impl<Id: ResourceId> ResourceIndex<Id> {
if let Some(prev_id) = self.path_to_id.get(path) {
self.id_to_paths
.get_mut(&prev_id.item)
.unwrap()
.expect("Resource ID not found in the ID to paths map")
.remove(path);
}

Expand All @@ -547,9 +567,16 @@ impl<Id: ResourceId> ResourceIndex<Id> {
.or_default()
.insert(path.to_path_buf());

let timpestamped_path = Timestamped {
item: path.to_path_buf(),
last_modified,
};
result
.added
.insert(id, HashSet::from([timpestamped_path]));
tareknaser marked this conversation as resolved.
Show resolved Hide resolved
log::trace!("Resource added/updated: {:?}", path);
}

Ok(())
Ok(result)
}
}
7 changes: 5 additions & 2 deletions fs-index/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
mod index;
mod serde;
mod utils;
#[cfg(feature = "watch")]
mod watch;

pub use index::{IndexUpdate, ResourceIndex};
pub use utils::load_or_build_index;

pub use index::ResourceIndex;
#[cfg(feature = "watch")]
pub use watch::{watch_index, WatchEvent};

#[cfg(test)]
mod tests;
Loading
Loading