Skip to content

Commit

Permalink
watcher-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Oct 20, 2024
1 parent 648ba00 commit 6a4a663
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 0 deletions.
53 changes: 53 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[package]
name = "wtr-watcher"
version = "0.12.1" # hook: tool/release
edition = "2021"
build = "watcher-rs/build.rs"
authors = ["Will <[email protected]>"]
description = "Filesystem watcher. Works anywhere. Simple, efficient and friendly."
documentation = "https://github.com/e-dant/watcher/blob/release/readme.md"
homepage = "https://github.com/e-dant/watcher"
keywords = [
"events",
"filesystem",
"monitoring",
"tracing",
"watcher",
]
categories = [
"asynchronous",
"command-line-interface",
"command-line-utilities",
"development-tools",
"filesystem",
]
license = "MIT"
readme = "readme.md"
repository = "https://github.com/e-dant/watcher"

[lib]
name = "wtr_watcher"
path = "watcher-rs/src/lib.rs"

[[bin]]
name = "wtr-watcher"
path = "watcher-rs/src/main.rs"
required-features = ["cli"]

[[example]]
name = "show-events"
path = "watcher-rs/examples/show-events.rs"

[features]
cli = ["serde_json", "tokio/rt", "tokio/macros", "tokio/signal"]
default = ["cli"]

[dependencies]
futures = { version = "0.3.31", default-features = false }
serde = { version = "1.0.205", features = ["derive"] }
serde_json = { version = "1.0.122", optional = true }
tokio = { version = "1.0.1", features = ["sync"], default-features = false }

[build-dependencies]
cc = "1"
bindgen = "0"
32 changes: 32 additions & 0 deletions watcher-rs/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
fn main() {
let out_dir = std::env::var("OUT_DIR").unwrap();
let out_dir = std::path::Path::new(&out_dir);
bindgen::Builder::default()
.header(format!("watcher-c/include/wtr/watcher-c.h"))
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.generate()
.unwrap()
.write_to_file(out_dir.join("watcher_c.rs"))
.unwrap();
if cfg!(target_os = "macos") {
println!("cargo:rustc-link-lib=framework=CoreFoundation");
println!("cargo:rustc-link-lib=framework=CoreServices");
}
cc::Build::new()
.cpp(true)
.opt_level(2)
.files(["watcher-c/src/watcher-c.cpp"].iter())
.include("watcher-c/include")
.include("include")
.flag_if_supported("-std=c++17")
.flag_if_supported("-fno-exceptions")
.flag_if_supported("-fno-rtti")
.flag_if_supported("-fno-omit-frame-pointer")
.flag_if_supported("-fstrict-enums")
.flag_if_supported("-fstrict-overflow")
.flag_if_supported("-fstrict-aliasing")
.flag_if_supported("-fstack-protector-strong")
.flag_if_supported("/std:c++17")
.flag_if_supported("/EHsc")
.compile("watcher_c");
}
12 changes: 12 additions & 0 deletions watcher-rs/examples/show-events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use futures::StreamExt;
use std::env::args;
use wtr_watcher::Watch;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let path = args().nth(1).unwrap_or_else(|| ".".to_string());
let show = |e| async move { println!("{e:?}") };
let events = Watch::try_new(&path)?;
events.for_each(show).await;
Ok(())
}
147 changes: 147 additions & 0 deletions watcher-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
include!(concat!(env!("OUT_DIR"), "/watcher_c.rs"));
use core::ffi::c_void;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum EffectType {
Rename,
Modify,
Create,
Destroy,
Owner,
Other,
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum PathType {
Dir,
File,
HardLink,
SymLink,
Watcher,
Other,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Event {
pub effect_time: i64,
pub path_name: String,
pub associated_path_name: String,
pub effect_type: EffectType,
pub path_type: PathType,
}

fn c_ptr_as_str<'a>(ptr: *const std::os::raw::c_char) -> &'a str {
if ptr.is_null() {
return "";
}
let b = unsafe { std::ffi::CStr::from_ptr(ptr).to_bytes() };
std::str::from_utf8(b).unwrap_or_default()
}

fn effect_type_from_c(effect_type: i8) -> EffectType {
match effect_type {
WTR_WATCHER_EFFECT_RENAME => EffectType::Rename,
WTR_WATCHER_EFFECT_MODIFY => EffectType::Modify,
WTR_WATCHER_EFFECT_CREATE => EffectType::Create,
WTR_WATCHER_EFFECT_DESTROY => EffectType::Destroy,
WTR_WATCHER_EFFECT_OWNER => EffectType::Owner,
WTR_WATCHER_EFFECT_OTHER => EffectType::Other,
_ => EffectType::Other,
}
}

fn path_type_from_c(path_type: i8) -> PathType {
match path_type {
WTR_WATCHER_PATH_DIR => PathType::Dir,
WTR_WATCHER_PATH_FILE => PathType::File,
WTR_WATCHER_PATH_HARD_LINK => PathType::HardLink,
WTR_WATCHER_PATH_SYM_LINK => PathType::SymLink,
WTR_WATCHER_PATH_WATCHER => PathType::Watcher,
WTR_WATCHER_PATH_OTHER => PathType::Other,
_ => PathType::Other,
}
}

fn ev_from_c<'a>(event: wtr_watcher_event) -> Event {
Event {
effect_time: event.effect_time,
path_name: c_ptr_as_str(event.path_name).to_string(),
associated_path_name: c_ptr_as_str(event.associated_path_name).to_string(),
effect_type: effect_type_from_c(event.effect_type),
path_type: path_type_from_c(event.path_type),
}
}

unsafe extern "C" fn callback_bridge(event: wtr_watcher_event, data: *mut c_void) {
let ev = ev_from_c(event);
let tx = std::mem::transmute::<*mut c_void, &Sender<Event>>(data);
let _ = tx.blocking_send(ev);
}

#[allow(dead_code)]
pub struct Watch {
watcher: *mut c_void,
ev_rx: Receiver<Event>,
ev_tx: Box<Sender<Event>>,
}

impl Watch {
pub fn try_new(path: &str) -> Result<Watch, &'static str> {
let path = std::ffi::CString::new(path).unwrap();
let (ev_tx, ev_rx) = tokio::sync::mpsc::channel(1);
let ev_tx = Box::new(ev_tx);
let ev_tx_opaque = unsafe { std::mem::transmute::<&Sender<Event>, *mut c_void>(&ev_tx) };
match unsafe { wtr_watcher_open(path.as_ptr(), Some(callback_bridge), ev_tx_opaque) } {
watcher if watcher.is_null() => Err("wtr_watcher_open"),
watcher => Ok(Watch {
watcher,
ev_rx,
ev_tx,
}),
}
/*
let watcher_opaque =
unsafe { wtr_watcher_open(path.as_ptr(), Some(callback_bridge), ev_tx_opaque) };
if watcher_opaque.is_null() {
Err("wtr_watcher_open")
} else {
Ok(Watcher {
watcher_opaque,
ev_rx,
ev_tx,
})
}
*/
}

pub fn close(&self) -> Result<(), &'static str> {
match unsafe { wtr_watcher_close(self.watcher) } {
false => Err("wtr_watcher_close"),
true => Ok(()),
}
}
}

impl futures::Stream for Watch {
type Item = Event;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context,
) -> std::task::Poll<Option<Self::Item>> {
self.ev_rx.poll_recv(cx)
}
}

impl Drop for Watch {
fn drop(&mut self) {
let _ = self.close();
}
}
16 changes: 16 additions & 0 deletions watcher-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use futures::StreamExt;
use std::env::args;
use wtr_watcher::Watch;

async fn show(e: wtr_watcher::Event) {
println!("{}", serde_json::to_string(&e).unwrap())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let p = args().nth(1).unwrap_or_else(|| ".".to_string());
tokio::select! {
_ = Watch::try_new(&p)?.for_each(show) => Ok(()),
_ = tokio::signal::ctrl_c() => Ok(()),
}
}

0 comments on commit 6a4a663

Please sign in to comment.