From b62e0e7ec12f20eaa1af200cae2f9f687a7c91df Mon Sep 17 00:00:00 2001 From: Ross Younger Date: Tue, 8 Oct 2024 23:56:43 +1300 Subject: [PATCH] style: Dynamically update spinner tick rate as a function of throughput --- src/cli/cli_main.rs | 7 ++-- src/client/main_loop.rs | 16 ++++++--- src/client/meter.rs | 80 +++++++++++++++++++++++++++++++++++++++-- src/console.rs | 2 ++ src/transport.rs | 10 ++++++ 5 files changed, 106 insertions(+), 9 deletions(-) diff --git a/src/cli/cli_main.rs b/src/cli/cli_main.rs index 19e1395..d6e9412 100644 --- a/src/cli/cli_main.rs +++ b/src/cli/cli_main.rs @@ -13,7 +13,7 @@ use crate::{ util::setup_tracing, }; use clap::Parser; -use indicatif::MultiProgress; +use indicatif::{MultiProgress, ProgressDrawTarget}; use tracing::error_span; /// Main CLI entrypoint @@ -36,7 +36,10 @@ pub fn cli() -> anyhow::Result { #[tokio::main(flavor = "current_thread")] async fn run_client(args: &CliArgs) -> anyhow::Result { - let progress = MultiProgress::new(); // This writes to stderr + let progress = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz( + crate::console::MAX_UPDATE_FPS, + )); + let trace_level = if args.debug { "trace" } else if args.quiet { diff --git a/src/client/main_loop.rs b/src/client/main_loop.rs index 459e91b..18d46a6 100644 --- a/src/client/main_loop.rs +++ b/src/client/main_loop.rs @@ -90,14 +90,15 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any // Show time! --------------------- spinner.set_message("Transferring data"); timers.next(SHOW_TIME); - let file_buffer_size = - usize::try_from(BandwidthConfig::from(BandwidthParams::from(args)).send_buffer)?; + let bandwidth = BandwidthParams::from(args); + let file_buffer_size = usize::try_from(BandwidthConfig::from(bandwidth).send_buffer)?; let result = manage_request( connection, processed_args, progress.clone(), spinner.clone(), file_buffer_size, + bandwidth, ) .await; let total_bytes = match result { @@ -150,6 +151,7 @@ async fn manage_request( mp: MultiProgress, spinner: ProgressBar, file_buffer_size: usize, + bandwidth: BandwidthParams, ) -> Result { let mut tasks = tokio::task::JoinSet::new(); let _jh = tasks.spawn(async move { @@ -166,6 +168,7 @@ async fn manage_request( &processed, mp, spinner, + bandwidth, ) .instrument(trace_span!("GET", filename = processed.source.filename)) .await @@ -179,6 +182,7 @@ async fn manage_request( mp, spinner, file_buffer_size, + bandwidth, ) .instrument(trace_span!("PUT", filename = processed.source.filename)) .await @@ -306,6 +310,7 @@ async fn do_get( cli_args: &UnpackedArgs, multi_progress: MultiProgress, spinner: ProgressBar, + bandwidth: BandwidthParams, ) -> Result { let mut stream: StreamPair = sp.into(); let real_start = Instant::now(); @@ -337,7 +342,8 @@ async fn do_get( let progress_bar = progress_bar_for(&multi_progress, cli_args, header.size + 16)? .with_elapsed(Instant::now().duration_since(real_start)); - let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner); + let mut meter = + crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner, bandwidth.rx()); meter.start().await; let inbound = progress_bar.wrap_async_read(stream.recv); @@ -368,6 +374,7 @@ async fn do_put( multi_progress: MultiProgress, spinner: ProgressBar, file_buffer_size: usize, + bandwidth: BandwidthParams, ) -> Result { let mut stream: StreamPair = sp.into(); @@ -390,7 +397,8 @@ async fn do_put( let steps = payload_len + 48 + 36 + 16 + 2 * dest_filename.len() as u64; let progress_bar = progress_bar_for(&multi_progress, cli_args, steps)?; let mut outbound = progress_bar.wrap_async_write(stream.send); - let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner); + let mut meter = + crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner, bandwidth.tx()); meter.start().await; trace!("sending command"); diff --git a/src/client/meter.rs b/src/client/meter.rs index 6288803..d05380c 100644 --- a/src/client/meter.rs +++ b/src/client/meter.rs @@ -26,9 +26,13 @@ pub(crate) struct InstaMeterRunner { } impl InstaMeterRunner { - pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self { + pub(crate) fn new(source: &ProgressBar, destination: ProgressBar, max_throughput: u64) -> Self { Self { - inner: Arc::new(Mutex::new(InstaMeterInner::new(source, destination))), + inner: Arc::new(Mutex::new(InstaMeterInner::new( + source, + destination, + max_throughput, + ))), task: None, stopper: None, } @@ -92,14 +96,17 @@ pub(crate) struct InstaMeterInner { previous_position: u64, source: ProgressBar, destination: ProgressBar, + tick_calc: TickRateCalculator, } impl InstaMeterInner { - pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self { + pub(crate) fn new(source: &ProgressBar, destination: ProgressBar, max_throughput: u64) -> Self { + #[allow(clippy::cast_precision_loss)] Self { previous_position: 0u64, source: source.clone(), destination, + tick_calc: TickRateCalculator::new(max_throughput as f64), } } @@ -116,6 +123,73 @@ impl InstaMeterInner { rate.human_throughput_bytes() ); self.destination.set_message(msg.clone()); + self.destination + .enable_steady_tick(self.tick_calc.tick_time(progress)); msg } } + +/// This is a Rust implementation of the calibration algorithm from +/// `https://github.com/rsalmei/alive-progress/blob/main/alive_progress/core/calibration.py` +#[derive(Clone, Copy, Debug)] +struct TickRateCalculator { + calibration: f64, + adjust: f64, + factor: f64, +} + +const MIN_FPS: f64 = 0.2; +const MAX_FPS: f64 = crate::console::MAX_UPDATE_FPS as f64; + +impl TickRateCalculator { + fn new(max_throughput: f64) -> Self { + let calibration = f64::max(max_throughput, 0.000_001); + let adjust = 100. / f64::min(calibration, 100.); + #[allow(clippy::cast_lossless)] + let factor = (MAX_FPS - MIN_FPS) / ((calibration * adjust) + 1.).log10(); + + Self { + calibration, + adjust, + factor, + } + } + fn tick_rate(&self, rate: f64) -> f64 { + if rate <= 0. { + 10. // Initial rate + } else if rate <= self.calibration { + ((rate * self.adjust) + 1.).log10() * self.factor + MIN_FPS + } else { + MAX_FPS + } + } + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + fn tick_time(&self, rate: f64) -> Duration { + Duration::from_millis((1000. / self.tick_rate(rate)) as u64) + } +} + +#[cfg(test)] +mod test { + use super::TickRateCalculator; + + fn rate(tput: f64) { + let trc = TickRateCalculator::new(5. * 37_500_000.0); + let hz = trc.tick_rate(tput); + let dura = trc.tick_time(tput); + println!("tput {tput} -> rate {hz} -> {dura:?}"); + } + + #[test] + fn rates() { + rate(1.); + rate(10.); + rate(100.); + rate(1_000.); + rate(10_000.); + rate(100_000.); + rate(1_000_000.); + rate(10_000_000.); + rate(37_500_000.); + } +} diff --git a/src/console.rs b/src/console.rs index 6f673fa..453769c 100644 --- a/src/console.rs +++ b/src/console.rs @@ -1,6 +1,8 @@ // Console related functions and styling // (c) 2024 Ross Younger +pub(crate) const MAX_UPDATE_FPS: u8 = 20; + use console::Term; const PROGRESS_STYLE_COMPACT: &str = diff --git a/src/transport.rs b/src/transport.rs index fce1f40..93b520f 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -75,6 +75,16 @@ impl BandwidthParams { pub fn bandwidth_delay_product_rx(&self) -> u64 { self.rx * self.rtt.as_millis() as u64 / 1000 } + #[must_use] + /// Receive bandwidth (accessor) + pub fn rx(&self) -> u64 { + self.rx + } + #[must_use] + /// Transmit bandwidth (accessor) + pub fn tx(&self) -> u64 { + self.tx + } } #[derive(Debug, Clone, Copy)]