diff --git a/src/state.rs b/src/state.rs index 9e36aeec..43a5095c 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; +use std::io; use std::sync::Arc; use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use std::time::Instant; -use std::{fmt, io}; #[cfg(target_arch = "wasm32")] use instant::Instant; @@ -230,13 +230,14 @@ pub struct ProgressState { impl ProgressState { pub(crate) fn new(len: Option, pos: Arc) -> Self { + let now = Instant::now(); Self { pos, len, tick: 0, status: Status::InProgress, - started: Instant::now(), - est: Estimator::new(Instant::now()), + started: now, + est: Estimator::new(now), message: TabExpandedString::NoTabs("".into()), prefix: TabExpandedString::NoTabs("".into()), } @@ -276,7 +277,7 @@ impl ProgressState { let pos = self.pos.pos.load(Ordering::Relaxed); - let sps = self.est.steps_per_second(); + let sps = self.est.steps_per_second(Instant::now()); // Infinite duration should only ever happen at the beginning, so in this case it's okay to // just show an ETA of 0 until progress starts to occur. @@ -298,7 +299,7 @@ impl ProgressState { /// The number of steps per second pub fn per_sec(&self) -> f64 { if let Status::InProgress = self.status { - self.est.steps_per_second() + self.est.steps_per_second(Instant::now()) } else { let len = self.len.unwrap_or_else(|| self.pos()); len as f64 / self.started.elapsed().as_secs_f64() @@ -376,85 +377,131 @@ impl TabExpandedString { } } -/// Estimate the number of seconds per step +/// Double-smoothed exponentially weighted estimator +/// +/// This uses an exponentially weighted *time-based* estimator, meaning that it exponentially +/// downweights old data based on its age. The rate at which this occurs is currently a constant +/// value of 15 seconds for 90% weighting. This means that all data older than 15 seconds has a +/// collective weight of 0.1 in the estimate, and all data older than 30 seconds has a collective +/// weight of 0.01, and so on. /// -/// Ring buffer with constant capacity. Used by `ProgressBar`s to display `{eta}`, -/// `{eta_precise}`, and `{*_per_sec}`. +/// The primary value exposed by `Estimator` is `steps_per_second`. This value is doubly-smoothed, +/// meaning that is the result of using an exponentially weighted estimator (as described above) to +/// estimate the value of another exponentially weighted estimator, which estimates the value of +/// the raw data. +/// +/// The purpose of this extra smoothing step is to reduce instantaneous fluctations in the estimate +/// when large updates are received. Without this, estimates might have a large spike followed by a +/// slow asymptotic approach to zero (until the next spike). +#[derive(Debug)] pub(crate) struct Estimator { - steps: [f64; 16], - pos: u8, - full: bool, + smoothed_steps_per_sec: f64, + double_smoothed_steps_per_sec: f64, prev_steps: u64, prev_time: Instant, + start_time: Instant, } impl Estimator { fn new(now: Instant) -> Self { Self { - steps: [0.0; 16], - pos: 0, - full: false, + smoothed_steps_per_sec: 0.0, + double_smoothed_steps_per_sec: 0.0, prev_steps: 0, prev_time: now, + start_time: now, } } fn record(&mut self, new_steps: u64, now: Instant) { - let delta = new_steps.saturating_sub(self.prev_steps); - if delta == 0 || now < self.prev_time { + // sanity check: don't record data if time or steps have not advanced + if new_steps <= self.prev_steps || now <= self.prev_time { // Reset on backwards seek to prevent breakage from seeking to the end for length determination // See https://github.com/console-rs/indicatif/issues/480 if new_steps < self.prev_steps { + self.prev_steps = new_steps; self.reset(now); } return; } - let elapsed = now - self.prev_time; - let divisor = delta as f64; - let mut batch = 0.0; - if divisor != 0.0 { - batch = duration_to_secs(elapsed) / divisor; - }; + let delta_steps = new_steps - self.prev_steps; + let delta_t = duration_to_secs(now - self.prev_time); - self.steps[self.pos as usize] = batch; - self.pos = (self.pos + 1) % 16; - if !self.full && self.pos == 0 { - self.full = true; - } + // the rate of steps we saw in this update + let new_steps_per_second = delta_steps as f64 / delta_t; + + // update the estimate: a weighted average of the old estimate and new data + let weight = estimator_weight(delta_t); + self.smoothed_steps_per_sec = + self.smoothed_steps_per_sec * weight + new_steps_per_second * (1.0 - weight); + + // An iterative estimate like `smoothed_steps_per_sec` is supposed to be an exponentially + // weighted average from t=0 back to t=-inf; Since we initialize it to 0, we neglect the + // (non-existent) samples in the weighted average prior to the first one, so the resulting + // average must be normalized. We normalize the single estimate here in order to use it as + // a source for the double smoothed estimate. See comment on normalization in + // `steps_per_second` for details. + let delta_t_start = duration_to_secs(now - self.start_time); + let total_weight = 1.0 - estimator_weight(delta_t_start); + let normalized_smoothed_steps_per_sec = self.smoothed_steps_per_sec / total_weight; + + // determine the double smoothed value (EWA smoothing of the single EWA) + self.double_smoothed_steps_per_sec = self.double_smoothed_steps_per_sec * weight + + normalized_smoothed_steps_per_sec * (1.0 - weight); self.prev_steps = new_steps; self.prev_time = now; } + /// Reset the state of the estimator. Once reset, estimates will not depend on any data prior + /// to `now`. This does not reset the stored position of the progress bar. pub(crate) fn reset(&mut self, now: Instant) { - self.pos = 0; - self.full = false; - self.prev_steps = 0; - self.prev_time = now; - } + self.smoothed_steps_per_sec = 0.0; + self.double_smoothed_steps_per_sec = 0.0; - /// Average time per step in seconds, using rolling buffer of last 15 steps - fn steps_per_second(&self) -> f64 { - let len = self.len(); - len as f64 / self.steps[0..len].iter().sum::() - } - - fn len(&self) -> usize { - match self.full { - true => 16, - false => self.pos as usize, - } - } -} - -impl fmt::Debug for Estimator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Estimate") - .field("steps", &&self.steps[..self.len()]) - .field("prev_steps", &self.prev_steps) - .field("prev_time", &self.prev_time) - .finish() + // only reset prev_time, not prev_steps + self.prev_time = now; + self.start_time = now; + } + + /// Average time per step in seconds, using double exponential smoothing + fn steps_per_second(&self, now: Instant) -> f64 { + // Because the value stored in the Estimator is only updated when the Estimator receives an + // update, this value will become stuck if progress stalls. To return an accurate estimate, + // we determine how much time has passed since the last update, and treat this as a + // pseudo-update with 0 steps. + let delta_t = duration_to_secs(now - self.prev_time); + let reweight = estimator_weight(delta_t); + + // Normalization of estimates: + // + // The raw estimate is a single value (smoothed_steps_per_second) that is iteratively + // updated. At each update, the previous value of the estimate is downweighted according to + // its age, receiving the iterative weight W(t) = 0.1 ^ (t/15). + // + // Since W(Sum(t_n)) = Prod(W(t_n)), the total weight of a sample after a series of + // iterative steps is simply W(t_e) - W(t_b), where t_e is the time since the end of the + // sample, and t_b is the time since the beginning. The resulting estimate is therefore a + // weighted average with sample weights W(t_e) - W(t_b). + // + // Notice that the weighting function generates sample weights that sum to 1 only when the + // sample times span from t=0 to t=inf; but this is not the case. We have a first sample + // with finite, positive t_b = t_f. In the raw estimate, we handle times prior to t_f by + // setting an initial value of 0, meaning that these (non-existent) samples have no weight. + // + // Therefore, the raw estimate must be normalized by dividing it by the sum of the weights + // in the weighted average. This sum is just W(0) - W(t_f), where t_f is the time since the + // first sample, and W(0) = 1. + let delta_t_start = duration_to_secs(now - self.start_time); + let total_weight = 1.0 - estimator_weight(delta_t_start); + + // Generate updated values for `smoothed_steps_per_sec` and `double_smoothed_steps_per_sec` + // (sps and dsps) without storing them. Note that we normalize sps when using it as a + // source to update dsps, and then normalize dsps itself before returning it. + let sps = self.smoothed_steps_per_sec * reweight / total_weight; + let dsps = self.double_smoothed_steps_per_sec * reweight + sps * (1.0 - reweight); + dsps / total_weight } } @@ -566,6 +613,35 @@ impl Default for ProgressFinish { } } +/// Get the appropriate dilution weight for Estimator data given the data's age (in seconds) +/// +/// Whenever an update occurs, we will create a new estimate using a weight `w_i` like so: +/// +/// ```math +/// = * w_i + * (1 - w_i) +/// ``` +/// +/// In other words, the new estimate is a weighted average of the previous estimate and the new +/// data. We want to choose weights such that for any set of samples where `t_0, t_1, ...` are +/// the durations of the samples: +/// +/// ```math +/// Sum(t_i) = ews ==> Prod(w_i) = 0.1 +/// ``` +/// +/// With this constraint it is easy to show that +/// +/// ```math +/// w_i = 0.1 ^ (t_i / ews) +/// ``` +/// +/// Notice that the constraint implies that estimates are independent of the durations of the +/// samples, a very useful feature. +fn estimator_weight(age: f64) -> f64 { + const EXPONENTIAL_WEIGHTING_SECONDS: f64 = 15.0; + 0.1_f64.powf(age / EXPONENTIAL_WEIGHTING_SECONDS) +} + fn duration_to_secs(d: Duration) -> f64 { d.as_secs() as f64 + f64::from(d.subsec_nanos()) / 1_000_000_000f64 } @@ -599,23 +675,22 @@ mod tests { let mut est = Estimator::new(now); let mut pos = 0; - for _ in 0..est.steps.len() { + for _ in 0..20 { pos += items_per_second; now += Duration::from_secs(1); est.record(pos, now); } - let avg_steps_per_second = est.steps_per_second(); + let avg_steps_per_second = est.steps_per_second(now); assert!(avg_steps_per_second > 0.0); assert!(avg_steps_per_second.is_finite()); - let expected_rate = items_per_second as f64; - let absolute_error = (avg_steps_per_second - expected_rate).abs(); - let relative_error = absolute_error / expected_rate; + let absolute_error = (avg_steps_per_second - items_per_second as f64).abs(); + let relative_error = absolute_error / items_per_second as f64; assert!( relative_error < 1.0 / 1e9, "Expected rate: {}, actual: {}, relative error: {}", - expected_rate, + items_per_second, avg_steps_per_second, relative_error ); @@ -633,24 +708,50 @@ mod tests { } #[test] - fn test_duration_stuff() { - let duration = Duration::new(42, 100_000_000); - let secs = duration_to_secs(duration); - assert_eq!(secs_to_duration(secs), duration); + fn test_double_exponential_ave() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + let mut pos = 0; + + // note: this is the default weight set in the Estimator + let weight = 15; + + for _ in 0..weight { + pos += 1; + now += Duration::from_secs(1); + est.record(pos, now); + } + now += Duration::from_secs(weight); + + // The first level EWA: + // -> 90% weight @ 0 eps, 9% weight @ 1 eps, 1% weight @ 0 eps + // -> then normalized by deweighting the 1% weight (before -30 seconds) + let single_target = 0.09 / 0.99; + + // The second level EWA: + // -> same logic as above, but using the first level EWA as the source + let double_target = (0.9 * single_target + 0.09) / 0.99; + assert_eq!(est.steps_per_second(now), double_target); } #[test] fn test_estimator_rewind_position() { - let now = Instant::now(); + let mut now = Instant::now(); let mut est = Estimator::new(now); - est.record(0, now); + + now += Duration::from_secs(1); est.record(1, now); - assert_eq!(est.len(), 1); - // Should not panic. + + // should not panic + now += Duration::from_secs(1); est.record(0, now); - // Assert that the state of the estimator reset on rewind - assert_eq!(est.len(), 0); + // check that reset occurred (estimator at 1 event per sec) + now += Duration::from_secs(1); + est.record(1, now); + assert_eq!(est.steps_per_second(now), 1.0); + + // check that progress bar handles manual seeking let pb = ProgressBar::hidden(); pb.set_length(10); pb.set_position(1); @@ -659,6 +760,29 @@ mod tests { pb.set_position(0); } + #[test] + fn test_reset_eta() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + + // two per second, then reset + now += Duration::from_secs(1); + est.record(2, now); + est.reset(now); + + // now one per second, and verify + now += Duration::from_secs(1); + est.record(3, now); + assert_eq!(est.steps_per_second(now), 1.0); + } + + #[test] + fn test_duration_stuff() { + let duration = Duration::new(42, 100_000_000); + let secs = duration_to_secs(duration); + assert_eq!(secs_to_duration(secs), duration); + } + #[test] fn test_atomic_position_large_time_difference() { let atomic_position = AtomicPosition::new();