From abe00561c7410bab075d7a77b592331e3e27a7df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 23 Sep 2017 17:32:42 +0200 Subject: [PATCH] Adds new function `interval_range` The functions returns a `Stream` that fires at some random value, that is in the given range, into the future. The next duration is calculated every time after the Interval fired into the future, by choosing a random value in the given range. --- Cargo.toml | 1 + src/interval.rs | 45 ++++++++++++++++++++++++++++++++++++++++----- src/lib.rs | 1 + src/timer.rs | 14 ++++++++++++-- 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d8d332..a7eb250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ Timer facilities for Tokio [dependencies] futures = "0.1" slab = "0.3.0" +rand = "0.3" diff --git a/src/interval.rs b/src/interval.rs index 3997238..661fb56 100644 --- a/src/interval.rs +++ b/src/interval.rs @@ -4,23 +4,58 @@ use {Sleep, TimerError}; use std::time::Duration; -/// A stream representing notifications at fixed interval +use rand::{thread_rng, Rng}; + +/// A stream representing notifications at given interval /// /// Intervals are created through `Timer::interval`. #[derive(Debug)] pub struct Interval { sleep: Sleep, - duration: Duration, + min_duration: Duration, + max_duration: Duration, } /// Create a new interval -pub fn new(sleep: Sleep, dur: Duration) -> Interval { +pub fn new(sleep: Sleep, min_dur: Duration, max_dur: Duration) -> Interval { Interval { sleep: sleep, - duration: dur, + min_duration: min_dur, + max_duration: max_dur, } } +const NANOS_PER_SEC: u32 = 1_000_000_000; + +/// Returns the next duration for an interval +/// If `min` and `max` are equal, the duration is fixed. +/// If `min` and `max` are not equal, a duration in the range [`min`, `max`] is returned. +/// +/// # Panics +/// +/// Panics if `max < min`. +pub(crate) fn next_duration(min: Duration, max: Duration) -> Duration { + let mut rng = thread_rng(); + + let secs = if min.as_secs() == max.as_secs() { + min.as_secs() + } else { + rng.gen_range(min.as_secs(), max.as_secs() + 1) + }; + + let nsecs = if min.subsec_nanos() == max.subsec_nanos() { + min.subsec_nanos() + } else if secs == min.as_secs() { + rng.gen_range(min.subsec_nanos(), NANOS_PER_SEC) + } else if secs == max.as_secs() { + rng.gen_range(0, max.subsec_nanos() + 1) + } else { + rng.gen_range(0, NANOS_PER_SEC) + }; + + Duration::new(secs, nsecs) +} + impl Stream for Interval { type Item = (); type Error = TimerError; @@ -29,7 +64,7 @@ impl Stream for Interval { let _ = try_ready!(self.sleep.poll()); // Reset the timeout - self.sleep = self.sleep.timer().sleep(self.duration); + self.sleep = self.sleep.timer().sleep(next_duration(self.min_duration, self.max_duration)); Ok(Async::Ready(Some(()))) } diff --git a/src/lib.rs b/src/lib.rs index 9ac3557..2ac0a13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,6 +90,7 @@ #[macro_use] extern crate futures; extern crate slab; +extern crate rand; mod interval; mod mpmc; diff --git a/src/timer.rs b/src/timer.rs index f95ab4c..efab0de 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -112,7 +112,7 @@ impl Timer { /// Creates a new interval which will fire at `dur` time into the future, /// and will repeat every `dur` interval after pub fn interval(&self, dur: Duration) -> Interval { - interval::new(self.sleep(dur), dur) + interval::new(self.sleep(dur), dur, dur) } /// Creates a new interval which will fire at the time specified by `at`, @@ -126,7 +126,17 @@ impl Timer { self.sleep(Duration::from_millis(0)) }; - interval::new(sleep, dur) + interval::new(sleep, dur, dur) + } + + /// Creates a new interval which will fire at a time in the range [`min`, `max`] into the + /// future, and will repeat every time with a new interval in the range [`min`, `max`] after. + /// + /// # Panics + /// + /// Panics if `max < min`. + pub fn interval_range(&self, min: Duration, max: Duration) -> Interval { + interval::new(self.sleep(interval::next_duration(min, max)), min, max) } }