Skip to content

Commit

Permalink
Merge pull request #17 from CleanCut/file-notifier
Browse files Browse the repository at this point in the history
Add file notifier integration into the
  • Loading branch information
CleanCut authored Sep 21, 2022
2 parents 41baaa0 + 2256153 commit 40db48d
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 76 deletions.
65 changes: 35 additions & 30 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,48 @@ name: CI

on:
push:
branches: [ "main" ]
branches: ["main"]
pull_request:
branches: [ "main" ]
branches: ["main"]
workflow_dispatch:

env:
CARGO_TERM_COLOR: always

jobs:
test:

runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [macos-latest, ubuntu-latest]

steps:
- uses: actions/checkout@v3

- name: Fetch cargo registry cache
uses: actions/cache@v3
continue-on-error: false
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
Cargo.lock
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Build
run: cargo build --verbose

- name: rustfmt & clippy
run: |
rustup component add clippy rustfmt
cargo clippy --workspace
cargo fmt --all -- --check
- name: Run tests
run: cargo test --verbose
- uses: actions/checkout@v3

- name: Fetch cargo registry cache
uses: actions/cache@v3
continue-on-error: false
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
Cargo.lock
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Build
run: cargo build

- name: rustfmt & clippy
run: |
rustup component add clippy rustfmt
cargo clippy --workspace
cargo fmt --all -- --check
- name: Run tests
run: cargo test -- --show-output
env:
RUST_LOG: trace
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
<!-- next-header -->
## [Unreleased] - ReleaseDate

### Added

- CI now runs on macOS in addition to Linux. Now we just need someone to help us [support Windows](https://github.com/CleanCut/headtail/issues/21)!

### Improved

- We now use a notify-based watcher (inotify on Linux, etc.) when available to avoid polling.
- Internal improvements.

## [0.3.0] - 2022-09-15

### Added
Expand Down
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,12 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "3.2.21", features = ["derive"] }
clap = { version = "3.2.21", features = [ "derive" ] }
crossbeam-channel = "0.5.6"
env_logger = "0.9.1"
log = "0.4.17"
notify = "5.0.0"
thiserror = "1.0.35"

[dev-dependencies]
tempfile = "3.3.0"
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ You need to [have Rust installed](https://www.rust-lang.org/tools/install).
```shell
# Install latest *release* version of headtail
$ cargo install headtail

# Install local development version of headtail from inside the git repo
$ cargo install --path .
```

```
Expand All @@ -44,6 +41,19 @@ $ headtail somebigfile.txt -f

See `headtail -h` for a full list of command-line options.

## Development

```
# Run locally with arguments
$ cargo run -- YOUR_ARGS_GO_HERE
# Enable debug logging
$ RUST_LOG=trace cargo run -- YOUR_ARGS_GO_HERE
# Install local development version of headtail into your ~/.cargo/bin
$ cargo install --path .
```

## Software License

Distributed under the terms of both the MIT license and the Apache License (Version 2.0).
Expand Down
10 changes: 10 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum HeadTailError {
#[error("IO error: {0}")]
IOError(#[from] std::io::Error),

#[error("File watcher error: {0}")]
FileWatcherError(#[from] notify::Error),
}
154 changes: 128 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
pub mod errors;
pub mod opts;

use std::{
collections::VecDeque,
io::{BufRead, ErrorKind, Result, Write},
io::{BufRead, ErrorKind, Write},
path::Path,
time::Duration,
};

use errors::HeadTailError;
use log::trace;
use notify::{event::EventKind, Watcher};

use opts::Opts;

fn careful_write(writer: &mut dyn Write, line: &str) -> Result<()> {
fn careful_write(writer: &mut dyn Write, line: &str) -> Result<(), HeadTailError> {
if let Err(e) = writer.write(line.as_bytes()) {
if e.kind() == ErrorKind::BrokenPipe {
return Ok(());
} else {
return Err(e);
return Err(e.into());
}
}
Ok(())
}

pub fn headtail(opts: &Opts) -> Result<()> {
pub fn headtail(opts: &Opts) -> Result<(), HeadTailError> {
let mut reader = opts.input_stream()?;
let mut writer = opts.output_stream()?;

Expand All @@ -28,17 +34,18 @@ pub fn headtail(opts: &Opts) -> Result<()> {
let mut line_num = 0;
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {
match reader.read_line(&mut line)? {
0 => {
for tail_line in &tail_buffer {
careful_write(&mut writer, tail_line)?;
}
let _ = writer.flush();
break;
}
Ok(_) => {
_ => {
if opts.head > line_num {
line_num += 1;
trace!(target: "head line", "read line: {}", line.trim_end());
careful_write(&mut writer, &line)?;
let _ = writer.flush();
} else {
Expand All @@ -48,31 +55,126 @@ pub fn headtail(opts: &Opts) -> Result<()> {
}
}
}
Err(e) => return Err(e),
}
}

// Keep following(?)

// Keep following
//
// To avoid wasted CPU cycles, we can use a file system watcher (e.g.
// `inotify(7)` on Linux).
//
// The `notify` crate provides several optimized file system watchers using
// functionality built into operating systems. Should an optimized watcher
// not be available, `notify` will default to a polling watcher.
if opts.follow && opts.filename.is_some() {
// Use a channel to send lines read back to the main thread
// TODO: 1024 is an arbitrary number. Let's benchmark different values.
let (tx, rx) = crossbeam_channel::bounded::<Result<String, HeadTailError>>(1024);

// If using a polling watcher, respect the `--sleep-interval` argument.
let sleep_interval = Duration::from_secs_f64(opts.sleep_interval);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => {
// This is a busy loop, so add a little sleep to make it less CPU hungry
std::thread::sleep(sleep_interval);
}
Ok(_) => {
careful_write(&mut writer, &line)?;
let _ = writer.flush();
}
Err(e) => {
println!("The error is {:?}", e.kind());
return Err(e);
let config = notify::Config::default().with_poll_interval(sleep_interval);

// Setup the file watcher
let opts2 = opts.clone(); // TODO: Refactor so we don't need to clone opts
let mut watcher = notify::RecommendedWatcher::new(
move |res: notify::Result<notify::Event>| {
match res {
Ok(event) => {
match event.kind {
EventKind::Any => trace!("EventKind::Any encountered"),
EventKind::Modify(m) => {
// TODO: Should(can?) we handle the truncation of a file? On macOS
// file truncation shows up as an EventKind::Modify(Metadata(Any)),
// which seems like could apply to events other than truncation.
trace!(target: "following file", "modified: {:?}", m);
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {}
Ok(_) => {
// If the main thread has closed the channel, it will soon cause
// us to exit cleanly, so we can ignore the error.
let _ = tx.send(Ok(line));
}
Err(e) => {
// Can ignore channel send error for the same reason as above...
trace!(target: "following file", "normal read error");
let _ = tx.send(Err(e.into()));
}
}
}
EventKind::Create(_) => {
trace!(target: "following file", "detected possible file (re)creation");
// The file has been recreated, so we need to re-open the input stream,
// read *everything* that is in the new file, and resume tailing.
let result = opts2.input_stream();
match result {
Ok(new_reader) => {
trace!(target: "following file", "succeeded reopening file");
reader = new_reader;
}
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
trace!(target: "following file", "cannot find file...aborting reopen");
return;
}
// Can ignore channel send error for the same reason as above...
let _ = tx.send(Err(e.into()));
}
}
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {
trace!(target: "following file", "catchup done");
break;
}
Ok(_) => {
trace!(target: "following file", "catchup read line: {}", line.trim_end());
// If the main thread has closed the channel, it will soon cause us to
// exit cleanly, so we can ignore the error.
let _ = tx.send(Ok(line));
}
Err(e) => {
// Can ignore sending error for same reason as 👆🏻
let _ = tx.send(Err(e.into()));
break;
}
}
}
}
EventKind::Remove(r) => {
trace!(target: "following file", "file removed: {:?}", r)
}
// We are being explicit about the variants we are ignoring just in case we
// need to research them.
EventKind::Access(_) => {}
EventKind::Other => {
trace!(target: "following file", "EventKind::Other encountered")
}
};
}
Err(e) => {
let _ = tx.send(Err(e.into()));
}
}
}
},
config,
)?;

// TODO: Figure out what to do about the possibility of a race condition between the initial
// headtail and the following. See https://github.com/CleanCut/headtail/pull/17/files#r973220630
watcher.watch(
Path::new(opts.filename.as_ref().unwrap()),
notify::RecursiveMode::NonRecursive,
)?;

// Loop over the lines sent from the `notify` watcher over a channel. This will block the
// main thread without sleeping.
for result in rx {
let line = result?;
careful_write(&mut writer, &line)?;
let _ = writer.flush();
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::io::Result;
use headtail::{errors::HeadTailError, headtail, opts::Opts};

use headtail::{headtail, opts::Opts};

fn main() -> Result<()> {
fn main() -> Result<(), HeadTailError> {
env_logger::init();
let opts = Opts::parse_args();
//println!("{opts:#?}");
headtail(&opts)?;
Expand Down
Loading

0 comments on commit 40db48d

Please sign in to comment.