Skip to content

Commit

Permalink
Add a near-instantaneous read-out of the throughput rate
Browse files Browse the repository at this point in the history
  • Loading branch information
crazyscot committed Oct 25, 2024
1 parent b55283d commit 5bfcb34
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rcgen = { version = "0.13.1" }
rustls-pki-types = "1.9.0"
static_assertions = "1.1.0"
strum_macros = "0.26.4"
tokio = { version = "1.40.0", default-features = true, features = ["fs", "io-std", "macros", "net", "process", "rt", "time"] }
tokio = { version = "1.40.0", default-features = true, features = ["fs", "io-std", "macros", "net", "process", "rt", "time", "sync"] }
tokio-util = { version = "0.7.12", features = ["compat"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
13 changes: 13 additions & 0 deletions src/client/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any
connection,
processed_args,
progress.clone(),
spinner.clone(),
file_buffer_size,
)
.await;
Expand Down Expand Up @@ -147,6 +148,7 @@ async fn manage_request(
connection: Connection,
processed: UnpackedArgs,
mp: MultiProgress,
spinner: ProgressBar,
file_buffer_size: usize,
) -> Result<u64, u64> {
let mut tasks = tokio::task::JoinSet::new();
Expand All @@ -163,6 +165,7 @@ async fn manage_request(
&processed.destination.filename,
&processed,
mp,
spinner,
)
.instrument(trace_span!("GET", filename = processed.source.filename))
.await
Expand All @@ -174,6 +177,7 @@ async fn manage_request(
&processed.destination.filename,
&processed,
mp,
spinner,
file_buffer_size,
)
.instrument(trace_span!("PUT", filename = processed.source.filename))
Expand Down Expand Up @@ -301,6 +305,7 @@ async fn do_get(
dest: &str,
cli_args: &UnpackedArgs,
multi_progress: MultiProgress,
spinner: ProgressBar,
) -> Result<u64> {
let mut stream: StreamPair = sp.into();
let real_start = Instant::now();
Expand Down Expand Up @@ -332,6 +337,9 @@ async fn do_get(
let progress_bar = progress_bar_for(&multi_progress, cli_args, header.size + 16)?
.with_elapsed(Instant::now().duration_since(real_start));

let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner);
meter.start().await;

let inbound = progress_bar.wrap_async_read(stream.recv);

let mut inbound = inbound.take(header.size);
Expand All @@ -345,6 +353,7 @@ async fn do_get(
// Trailer is empty for now, but its existence means the server believes the file was sent correctly

// Note that the Quinn send stream automatically calls finish on drop.
meter.stop().await;
file.flush().await?;
trace!("complete");
progress_bar.finish_and_clear();
Expand All @@ -357,6 +366,7 @@ async fn do_put(
dest_filename: &str,
cli_args: &UnpackedArgs,
multi_progress: MultiProgress,
spinner: ProgressBar,
file_buffer_size: usize,
) -> Result<u64> {
let mut stream: StreamPair = sp.into();
Expand All @@ -380,6 +390,8 @@ async fn do_put(
let steps = payload_len + 48 + 36 + 16 + 2 * dest_filename.len() as u64;
let progress_bar = progress_bar_for(&multi_progress, cli_args, steps)?;
let mut outbound = progress_bar.wrap_async_write(stream.send);
let mut meter = crate::client::meter::InstaMeterRunner::new(&progress_bar, spinner);
meter.start().await;

trace!("sending command");
let mut file = BufReader::with_capacity(file_buffer_size, file);
Expand Down Expand Up @@ -439,6 +451,7 @@ async fn do_put(
let trailer = FileTrailer::serialize_direct();
outbound.write_all(&trailer).await?;
outbound.flush().await?;
meter.stop().await;

let response = Response::read(&mut stream.recv).await?;
if response.status != Status::Ok {
Expand Down
121 changes: 121 additions & 0 deletions src/client/meter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Instant progress read-out
// (c) 2024 Ross Younger

//! # Rationale
//! `indicatif` has a smoothed, weighted moving-average estimator.
//! It is good for estimating the ETA, but conceals the full picture when bandwidth is spiky.
//! This struct computes the near-instant progress rate and updates the message on another progress bar.
//! Sorry (not sorry)...
use std::{
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};

use human_repr::HumanThroughput as _;
use indicatif::ProgressBar;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, warn};

/// Convenience wrapper for `InstaMeter` that takes care of starting & stopping
#[derive(Debug)]
pub(crate) struct InstaMeterRunner {
inner: Arc<Mutex<InstaMeterInner>>,
task: Option<JoinHandle<()>>,
stopper: Option<oneshot::Sender<()>>,
}

impl InstaMeterRunner {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self {
Self {
inner: Arc::new(Mutex::new(InstaMeterInner::new(source, destination))),
task: None,
stopper: None,
}
}
pub(crate) async fn start(&mut self) {
self.stop().await;
let (tx, mut rx) = oneshot::channel();
self.stopper = Some(tx);
self.task = Some(tokio::spawn({
let inner = self.inner.clone();
async move {
let interval = Duration::from_secs(1);
let mut earlier = SystemTime::now();
loop {
let sleep = tokio::time::sleep(interval);
tokio::pin!(sleep);
tokio::select! {
() = &mut sleep => (), // we woke up, continue
_ = &mut rx => break, // we've been signalled to stop
}

let now = SystemTime::now();
let delta = now.duration_since(earlier).unwrap_or(Duration::ZERO);
let msg = inner.lock().unwrap().update(delta);
debug!("{msg}");
earlier = now;
}
}
}));
}
pub(crate) async fn stop(&mut self) {
let stopper = self.stopper.take();
if let Some(tx) = stopper {
if tx.send(()).is_err() {
warn!("failed to notify meter to stop");
return;
} // else we sent OK.
} else {
return; // nothing to do
}
if let Some(task) = self.task.take() {
let _ = task.await.inspect_err(|e| warn!("meter task paniced: {e}"));
} else {
warn!("logic error: stop called with a stopper but no task");
}
}
}

impl Drop for InstaMeterRunner {
fn drop(&mut self) {
if let Some(t) = self.task.take() {
t.abort();
}
}
}

/// Near-instant progress meter wrapper for `ProgressBar`.
/// This struct holds the inner persistent data that is updated for the life of the struct.
#[derive(Clone, Debug)]
pub(crate) struct InstaMeterInner {
previous_position: u64,
source: ProgressBar,
destination: ProgressBar,
}

impl InstaMeterInner {
pub(crate) fn new(source: &ProgressBar, destination: ProgressBar) -> Self {
Self {
previous_position: 0u64,
source: source.clone(),
destination,
}
}

#[must_use]
fn update(&mut self, elapsed: Duration) -> String {
let current = self.source.position();
#[allow(clippy::cast_precision_loss)]
let progress = (current - self.previous_position) as f64;
let elapsed = elapsed.as_secs_f64();
let rate = progress / elapsed;
self.previous_position = current;
let msg = format!(
"Transferring data, instant rate: {}",
rate.human_throughput_bytes()
);
self.destination.set_message(msg.clone());
msg
}
}
1 change: 1 addition & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

pub mod control;
mod main_loop;
mod meter;

pub(crate) use main_loop::client_main;

0 comments on commit 5bfcb34

Please sign in to comment.