From 5bfcb343aeed689f84fe1065fe4de6f96473e184 Mon Sep 17 00:00:00 2001 From: Ross Younger Date: Tue, 8 Oct 2024 20:54:52 +1300 Subject: [PATCH] Add a near-instantaneous read-out of the throughput rate --- Cargo.toml | 2 +- src/client/main_loop.rs | 13 +++++ src/client/meter.rs | 121 ++++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 1 + 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 src/client/meter.rs diff --git a/Cargo.toml b/Cargo.toml index 31b15f1..1044ea2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ rcgen = { version = "0.13.1" } rustls-pki-types = "1.9.0" static_assertions = "1.1.0" strum_macros = "0.26.4" -tokio = { version = "1.40.0", default-features = true, features = ["fs", "io-std", "macros", "net", "process", "rt", "time"] } +tokio = { version = "1.40.0", default-features = true, features = ["fs", "io-std", "macros", "net", "process", "rt", "time", "sync"] } tokio-util = { version = "0.7.12", features = ["compat"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/src/client/main_loop.rs b/src/client/main_loop.rs index 73e6726..459e91b 100644 --- a/src/client/main_loop.rs +++ b/src/client/main_loop.rs @@ -96,6 +96,7 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any connection, processed_args, progress.clone(), + spinner.clone(), file_buffer_size, ) .await; @@ -147,6 +148,7 @@ async fn manage_request( connection: Connection, processed: UnpackedArgs, mp: MultiProgress, + spinner: ProgressBar, file_buffer_size: usize, ) -> Result { let mut tasks = tokio::task::JoinSet::new(); @@ -163,6 +165,7 @@ async fn manage_request( &processed.destination.filename, &processed, mp, + spinner, ) .instrument(trace_span!("GET", filename = processed.source.filename)) .await @@ -174,6 +177,7 @@ async fn manage_request( &processed.destination.filename, &processed, mp, + spinner, file_buffer_size, ) .instrument(trace_span!("PUT", filename = processed.source.filename)) @@ -301,6 +305,7 @@ async fn do_get( dest: &str, cli_args: &UnpackedArgs, multi_progress: MultiProgress, + spinner: ProgressBar, ) -> Result { let mut stream: StreamPair = sp.into(); let real_start = Instant::now(); @@ -332,6 +337,9 @@ async fn do_get( let progress_bar = progress_bar_for(&multi_progress, cli_args, header.size + 16)? .with_elapsed(Instant::now().duration_since(real_start)); + let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner); + meter.start().await; + let inbound = progress_bar.wrap_async_read(stream.recv); let mut inbound = inbound.take(header.size); @@ -345,6 +353,7 @@ async fn do_get( // Trailer is empty for now, but its existence means the server believes the file was sent correctly // Note that the Quinn send stream automatically calls finish on drop. + meter.stop().await; file.flush().await?; trace!("complete"); progress_bar.finish_and_clear(); @@ -357,6 +366,7 @@ async fn do_put( dest_filename: &str, cli_args: &UnpackedArgs, multi_progress: MultiProgress, + spinner: ProgressBar, file_buffer_size: usize, ) -> Result { let mut stream: StreamPair = sp.into(); @@ -380,6 +390,8 @@ async fn do_put( let steps = payload_len + 48 + 36 + 16 + 2 * dest_filename.len() as u64; let progress_bar = progress_bar_for(&multi_progress, cli_args, steps)?; let mut outbound = progress_bar.wrap_async_write(stream.send); + let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner); + meter.start().await; trace!("sending command"); let mut file = BufReader::with_capacity(file_buffer_size, file); @@ -439,6 +451,7 @@ async fn do_put( let trailer = FileTrailer::serialize_direct(); outbound.write_all(&trailer).await?; outbound.flush().await?; + meter.stop().await; let response = Response::read(&mut stream.recv).await?; if response.status != Status::Ok { diff --git a/src/client/meter.rs b/src/client/meter.rs new file mode 100644 index 0000000..6288803 --- /dev/null +++ b/src/client/meter.rs @@ -0,0 +1,121 @@ +// Instant progress read-out +// (c) 2024 Ross Younger + +//! # Rationale +//! `indicatif` has a smoothed, weighted moving-average estimator. +//! It is good for estimating the ETA, but conceals the full picture when bandwidth is spiky. +//! This struct computes the near-instant progress rate and updates the message on another progress bar. +//! Sorry (not sorry)... + +use std::{ + sync::{Arc, Mutex}, + time::{Duration, SystemTime}, +}; + +use human_repr::HumanThroughput as _; +use indicatif::ProgressBar; +use tokio::{sync::oneshot, task::JoinHandle}; +use tracing::{debug, warn}; + +/// Convenience wrapper for `InstaMeter` that takes care of starting & stopping +#[derive(Debug)] +pub(crate) struct InstaMeterRunner { + inner: Arc>, + task: Option>, + stopper: Option>, +} + +impl InstaMeterRunner { + pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self { + Self { + inner: Arc::new(Mutex::new(InstaMeterInner::new(source, destination))), + task: None, + stopper: None, + } + } + pub(crate) async fn start(&mut self) { + self.stop().await; + let (tx, mut rx) = oneshot::channel(); + self.stopper = Some(tx); + self.task = Some(tokio::spawn({ + let inner = self.inner.clone(); + async move { + let interval = Duration::from_secs(1); + let mut earlier = SystemTime::now(); + loop { + let sleep = tokio::time::sleep(interval); + tokio::pin!(sleep); + tokio::select! { + () = &mut sleep => (), // we woke up, continue + _ = &mut rx => break, // we've been signalled to stop + } + + let now = SystemTime::now(); + let delta = now.duration_since(earlier).unwrap_or(Duration::ZERO); + let msg = inner.lock().unwrap().update(delta); + debug!("{msg}"); + earlier = now; + } + } + })); + } + pub(crate) async fn stop(&mut self) { + let stopper = self.stopper.take(); + if let Some(tx) = stopper { + if tx.send(()).is_err() { + warn!("failed to notify meter to stop"); + return; + } // else we sent OK. + } else { + return; // nothing to do + } + if let Some(task) = self.task.take() { + let _ = task.await.inspect_err(|e| warn!("meter task paniced: {e}")); + } else { + warn!("logic error: stop called with a stopper but no task"); + } + } +} + +impl Drop for InstaMeterRunner { + fn drop(&mut self) { + if let Some(t) = self.task.take() { + t.abort(); + } + } +} + +/// Near-instant progress meter wrapper for `ProgressBar`. +/// This struct holds the inner persistent data that is updated for the life of the struct. +#[derive(Clone, Debug)] +pub(crate) struct InstaMeterInner { + previous_position: u64, + source: ProgressBar, + destination: ProgressBar, +} + +impl InstaMeterInner { + pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self { + Self { + previous_position: 0u64, + source: source.clone(), + destination, + } + } + + #[must_use] + fn update(&mut self, elapsed: Duration) -> String { + let current = self.source.position(); + #[allow(clippy::cast_precision_loss)] + let progress = (current - self.previous_position) as f64; + let elapsed = elapsed.as_secs_f64(); + let rate = progress / elapsed; + self.previous_position = current; + let msg = format!( + "Transferring data, instant rate: {}", + rate.human_throughput_bytes() + ); + self.destination.set_message(msg.clone()); + msg + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 3ce6778..42bad8b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,5 +2,6 @@ pub mod control; mod main_loop; +mod meter; pub(crate) use main_loop::client_main;