Skip to content

Commit

Permalink
style: Dynamically update spinner tick rate as a function of throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
crazyscot committed Oct 25, 2024
1 parent 5bfcb34 commit b62e0e7
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 9 deletions.
7 changes: 5 additions & 2 deletions src/cli/cli_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
util::setup_tracing,
};
use clap::Parser;
use indicatif::MultiProgress;
use indicatif::{MultiProgress, ProgressDrawTarget};
use tracing::error_span;

/// Main CLI entrypoint
Expand All @@ -36,7 +36,10 @@ pub fn cli() -> anyhow::Result<ExitCode> {

#[tokio::main(flavor = "current_thread")]
async fn run_client(args: &CliArgs) -> anyhow::Result<ExitCode> {
let progress = MultiProgress::new(); // This writes to stderr
let progress = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(
crate::console::MAX_UPDATE_FPS,
));

let trace_level = if args.debug {
"trace"
} else if args.quiet {
Expand Down
16 changes: 12 additions & 4 deletions src/client/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any
// Show time! ---------------------
spinner.set_message("Transferring data");
timers.next(SHOW_TIME);
let file_buffer_size =
usize::try_from(BandwidthConfig::from(BandwidthParams::from(args)).send_buffer)?;
let bandwidth = BandwidthParams::from(args);
let file_buffer_size = usize::try_from(BandwidthConfig::from(bandwidth).send_buffer)?;
let result = manage_request(
connection,
processed_args,
progress.clone(),
spinner.clone(),
file_buffer_size,
bandwidth,
)
.await;
let total_bytes = match result {
Expand Down Expand Up @@ -150,6 +151,7 @@ async fn manage_request(
mp: MultiProgress,
spinner: ProgressBar,
file_buffer_size: usize,
bandwidth: BandwidthParams,
) -> Result<u64, u64> {
let mut tasks = tokio::task::JoinSet::new();
let _jh = tasks.spawn(async move {
Expand All @@ -166,6 +168,7 @@ async fn manage_request(
&processed,
mp,
spinner,
bandwidth,
)
.instrument(trace_span!("GET", filename = processed.source.filename))
.await
Expand All @@ -179,6 +182,7 @@ async fn manage_request(
mp,
spinner,
file_buffer_size,
bandwidth,
)
.instrument(trace_span!("PUT", filename = processed.source.filename))
.await
Expand Down Expand Up @@ -306,6 +310,7 @@ async fn do_get(
cli_args: &UnpackedArgs,
multi_progress: MultiProgress,
spinner: ProgressBar,
bandwidth: BandwidthParams,
) -> Result<u64> {
let mut stream: StreamPair = sp.into();
let real_start = Instant::now();
Expand Down Expand Up @@ -337,7 +342,8 @@ async fn do_get(
let progress_bar = progress_bar_for(&multi_progress, cli_args, header.size + 16)?
.with_elapsed(Instant::now().duration_since(real_start));

let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner);
let mut meter =
crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner, bandwidth.rx());
meter.start().await;

let inbound = progress_bar.wrap_async_read(stream.recv);
Expand Down Expand Up @@ -368,6 +374,7 @@ async fn do_put(
multi_progress: MultiProgress,
spinner: ProgressBar,
file_buffer_size: usize,
bandwidth: BandwidthParams,
) -> Result<u64> {
let mut stream: StreamPair = sp.into();

Expand All @@ -390,7 +397,8 @@ async fn do_put(
let steps = payload_len + 48 + 36 + 16 + 2 * dest_filename.len() as u64;
let progress_bar = progress_bar_for(&multi_progress, cli_args, steps)?;
let mut outbound = progress_bar.wrap_async_write(stream.send);
let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner);
let mut meter =
crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner, bandwidth.tx());
meter.start().await;

trace!("sending command");
Expand Down
80 changes: 77 additions & 3 deletions src/client/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ pub(crate) struct InstaMeterRunner {
}

impl InstaMeterRunner {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar, max_throughput: u64) -> Self {
Self {
inner: Arc::new(Mutex::new(InstaMeterInner::new(source, destination))),
inner: Arc::new(Mutex::new(InstaMeterInner::new(
source,
destination,
max_throughput,
))),
task: None,
stopper: None,
}
Expand Down Expand Up @@ -92,14 +96,17 @@ pub(crate) struct InstaMeterInner {
previous_position: u64,
source: ProgressBar,
destination: ProgressBar,
tick_calc: TickRateCalculator,
}

impl InstaMeterInner {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar, max_throughput: u64) -> Self {
#[allow(clippy::cast_precision_loss)]
Self {
previous_position: 0u64,
source: source.clone(),
destination,
tick_calc: TickRateCalculator::new(max_throughput as f64),
}
}

Expand All @@ -116,6 +123,73 @@ impl InstaMeterInner {
rate.human_throughput_bytes()
);
self.destination.set_message(msg.clone());
self.destination
.enable_steady_tick(self.tick_calc.tick_time(progress));
msg
}
}

/// This is a Rust implementation of the calibration algorithm from
/// `https://github.com/rsalmei/alive-progress/blob/main/alive_progress/core/calibration.py`
#[derive(Clone, Copy, Debug)]
struct TickRateCalculator {
calibration: f64,
adjust: f64,
factor: f64,
}

const MIN_FPS: f64 = 0.2;
const MAX_FPS: f64 = crate::console::MAX_UPDATE_FPS as f64;

impl TickRateCalculator {
fn new(max_throughput: f64) -> Self {
let calibration = f64::max(max_throughput, 0.000_001);
let adjust = 100. / f64::min(calibration, 100.);
#[allow(clippy::cast_lossless)]
let factor = (MAX_FPS - MIN_FPS) / ((calibration * adjust) + 1.).log10();

Self {
calibration,
adjust,
factor,
}
}
fn tick_rate(&self, rate: f64) -> f64 {
if rate <= 0. {
10. // Initial rate
} else if rate <= self.calibration {
((rate * self.adjust) + 1.).log10() * self.factor + MIN_FPS
} else {
MAX_FPS
}
}
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
fn tick_time(&self, rate: f64) -> Duration {
Duration::from_millis((1000. / self.tick_rate(rate)) as u64)
}
}

#[cfg(test)]
mod test {
use super::TickRateCalculator;

fn rate(tput: f64) {
let trc = TickRateCalculator::new(5. * 37_500_000.0);
let hz = trc.tick_rate(tput);
let dura = trc.tick_time(tput);
println!("tput {tput} -> rate {hz} -> {dura:?}");
}

#[test]
fn rates() {
rate(1.);
rate(10.);
rate(100.);
rate(1_000.);
rate(10_000.);
rate(100_000.);
rate(1_000_000.);
rate(10_000_000.);
rate(37_500_000.);
}
}
2 changes: 2 additions & 0 deletions src/console.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Console related functions and styling
// (c) 2024 Ross Younger

pub(crate) const MAX_UPDATE_FPS: u8 = 20;

use console::Term;

const PROGRESS_STYLE_COMPACT: &str =
Expand Down
10 changes: 10 additions & 0 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ impl BandwidthParams {
pub fn bandwidth_delay_product_rx(&self) -> u64 {
self.rx * self.rtt.as_millis() as u64 / 1000
}
#[must_use]
/// Receive bandwidth (accessor)
pub fn rx(&self) -> u64 {
self.rx
}
#[must_use]
/// Transmit bandwidth (accessor)
pub fn tx(&self) -> u64 {
self.tx
}
}

#[derive(Debug, Clone, Copy)]
Expand Down

0 comments on commit b62e0e7

Please sign in to comment.