diff --git a/src/util/stream.rs b/src/util/stream.rs index 76e6c27e05e..158e28d5213 100644 --- a/src/util/stream.rs +++ b/src/util/stream.rs @@ -1,7 +1,8 @@ #[cfg(feature = "timer")] use tokio_timer::{ + debounce::{Debounce, DebounceBuilder, Edge}, throttle::Throttle, - Timeout, + timeout::Timeout, }; use futures::Stream; @@ -24,6 +25,63 @@ use std::time::Duration; /// /// [`timeout`]: #method.timeout pub trait StreamExt: Stream { + /// Debounce the stream on the trailing edge using the given duration. + /// + /// Errors will pass through without being debounced. Debouncing will + /// happen on the trailing edge. This means all items (except the last + /// one) will be discarded until the delay has elapsed without an item + /// being passed through. The last item that was passed through will + /// be returned. + /// + /// Care must be taken that this stream returns `Async::NotReady` at some point, + /// otherwise the debouncing implementation will overflow the stack during + /// `.poll()` (i. e. don't use this directly on `stream::repeat`). + /// + /// See also [`debounce_builder`], which allows more configuration over how the + /// debouncing is done. + /// + /// [`debounce_builder`]: #method.debounce_builder + fn debounce(self, dur: Duration) -> Debounce + where Self: Sized + { + self.debounce_builder() + .duration(dur) + .edge(Edge::Trailing) + .build() + } + + /// Create a builder that builds a debounced version of this stream. + /// + /// The returned builder can be used to configure the debouncing process. + /// + /// Care must be taken that this stream returns `Async::NotReady` at some point, + /// otherwise the debouncing implementation will overflow the stack during + /// `.poll()` (i. e. don't use this directly on `stream::repeat`). + fn debounce_builder(self) -> DebounceBuilder + where Self: Sized + { + DebounceBuilder::from_stream(self) + } + + /// Sample the stream at the given `interval`. + /// + /// Sampling works similar to debouncing in that frequent values will be + /// ignored. Sampling, however, ensures that an item is passed through at + /// least after every `interval`. Debounce, on the other hand, would not + /// pass items through until there has been enough "silence". + /// + /// Care must be taken that this stream returns `Async::NotReady` at some point, + /// otherwise the sampling implementation will overflow the stack during + /// `.poll()` (i. e. don't use this directly on `stream::repeat`). + fn sample(self, interval: Duration) -> Debounce + where Self: Sized + { + self.debounce_builder() + .max_wait(interval) + .edge(Edge::Leading) + .build() + } + /// Throttle down the stream by enforcing a fixed delay between items. /// /// Errors are also delayed. diff --git a/tokio-timer/src/debounce.rs b/tokio-timer/src/debounce.rs new file mode 100644 index 00000000000..5c9bdd79ed8 --- /dev/null +++ b/tokio-timer/src/debounce.rs @@ -0,0 +1,408 @@ +//! Debounce streams on the leading or trailing edge or both edges for a certain +//! amount of time. +//! +//! See [`Debounce`] for more details. +//! +//! [`Debounce`]: struct.Debounce.html + +use {clock, Delay, Error}; + +use futures::{ + future::Either, + prelude::*, +}; +use std::{ + cmp, + error::Error as StdError, + fmt::{Display, Formatter, Result as FmtResult}, + time::{Duration, Instant}, +}; + +/// Debounce streams on the leading or trailing edge or both. +/// +/// Useful for slowing processing of e. g. user input or network events +/// to a bearable rate. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Debounce { + delay: Option, + duration: Duration, + edge: Edge, + last_item: Option, + max_wait: Option, + max_wait_to: Option, + stream: T, +} + +/// Builds a debouncing stream. +#[derive(Debug)] +pub struct DebounceBuilder { + duration: Option, + edge: Option, + max_wait: Option, + stream: T, +} + +/// Either the error of the underlying stream, or an error within tokio's +/// timing machinery. +#[derive(Debug)] +pub struct DebounceError(Either); + +/// Which edge the debounce tiggers on. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub enum Edge { + /// The debounce triggers on the leading edge. + /// + /// The first stream item will be returned immediately and subsequent ones + /// will be ignored until the delay has elapsed without items passing through. + Leading, + + /// The debounce triggers on the trailing edge. + /// + /// All items (except the last one) are thrown away until the delay has elapsed + /// without items passing through. The last item is returned. + Trailing, + + /// The debounce triggers on both the leading and the trailing edge. + /// + /// The first and the last items will be returned. + /// + /// Note that trailing edge behavior will only be visible if the underlying + /// stream fires at least twice during the debouncing period. + Both, +} + +impl Debounce { + /// Constructs a new stream that debounces the items passed through. + /// + /// Care must be taken that `stream` returns `Async::NotReady` at some point, + /// otherwise the debouncing implementation will overflow the stack during + /// `.poll()` (i. e. don't use this directly on `stream::repeat`). + pub fn new( + stream: T, + duration: Duration, + edge: Edge, + max_wait: Option, + ) -> Self { + Self { + delay: None, + duration, + edge, + last_item: None, + max_wait, + max_wait_to: None, + stream, + } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } + + /// Computes the instant at which the next debounce delay elapses. + fn delay_time(&mut self) -> Instant { + let next = clock::now() + self.duration; + + if let Some(to) = self.max_wait_to { + cmp::min(next, to) + } else { + next + } + } + + /// Polls the underlying delay future. + fn poll_delay(d: &mut Delay) -> Poll<(), ::Error> { + d.poll().map_err(DebounceError::from_timer_error) + } + + /// Polls the underlying stream. + fn poll_stream( + &mut self, + ) -> Poll::Item>, ::Error> { + self.stream.poll().map_err(DebounceError::from_stream_error) + } + + /// Starts a new delay using the current duration and maximum waiting time. + fn start_delay(&mut self) { + self.max_wait_to = self.max_wait.map(|dur| clock::now() + dur); + self.delay = Some(Delay::new(self.delay_time())); + } +} + +impl Stream for Debounce { + type Item = T::Item; + type Error = DebounceError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.delay.take() { + Some(mut d) => match Self::poll_delay(&mut d)? { + // Delay has woken us up and is over, if we're trailing edge this + // means we need to return the last item. + Async::Ready(_) => { + if self.edge.is_trailing() { + if let Some(item) = self.last_item.take() { + return Ok(Async::Ready(Some(item))); + } + } + + return Ok(Async::NotReady); + }, + + // The stream has woken us up, but we have a delay. + Async::NotReady => match self.poll_stream()? { + // We have gotten an item, but we're currently blocked on + // the delay. Save it for later and reset the timer. + Async::Ready(Some(item)) => { + d.reset(self.delay_time()); + + self.delay = Some(d); + self.last_item = Some(item); + + self.poll() + }, + + // The stream has ended. Communicate this immediately to the + // following stream. + Async::Ready(None) => Ok(Async::Ready(None)), + + Async::NotReady => { + self.delay = Some(d); + Ok(Async::NotReady) + }, + }, + }, + + None => match try_ready!(self.poll_stream()) { + // We have gotten an item. Set up the delay for future items to be + // debounced. If we're on leading edge, return the item, otherwise + // save it for later. + Some(item) => { + self.start_delay(); + + if self.edge.is_leading() { + Ok(Async::Ready(Some(item))) + } else { + self.last_item = Some(item); + + self.poll() + } + }, + + // The stream has ended. Communicate this immediately to the + // following stream. + None => Ok(Async::Ready(None)), + }, + } + } +} + +impl DebounceBuilder { + /// Creates a new builder from the given debounce stream. + /// + /// Care must be taken that `stream` returns `Async::NotReady` at some point, + /// otherwise the debouncing implementation will overflow the stack during + /// `.poll()` (i. e. don't use this directly on `stream::repeat`). + pub fn from_stream(stream: T) -> Self { + DebounceBuilder { + duration: None, + edge: None, + max_wait: None, + stream: stream, + } + } + + /// Sets the duration to debounce to. + /// + /// If no duration is set here but [`max_wait`] is given instead, the resulting + /// stream will sample the underlying stream at the interval given by + /// [`max_wait`] instead of debouncing it. + /// + /// [`max_wait`]: #method.max_wait + pub fn duration(mut self, dur: Duration) -> Self { + self.duration = Some(dur); + self + } + + /// Sets the debouncing edge. + /// + /// An edge MUST be set before trying to [`build`] the debounce stream. + /// + /// [`build`]: #method.build + pub fn edge(mut self, edge: Edge) -> Self { + self.edge = Some(edge); + self + } + + /// Sets the maximum waiting time. + /// + /// If only a `max_wait` is given (and no [`duration`]), the resulting stream + /// will sample the underlying stream at the interval given by `max_wait` + /// instead of debouncing it. + /// Sampling cannot occur on both edges. Trying to build a sampling stream + /// on both edges will panic. + /// + /// [`duration`]: #method.duration + pub fn max_wait(mut self, max_wait: Duration) -> Self { + self.max_wait = Some(max_wait); + self + } +} + +impl DebounceBuilder { + /// Builds the debouncing stream. + /// + /// Panics if the edge or the duration is unspecified, or if only `max_wait` + /// together with `Edge::Both` was specified. + pub fn build(self) -> Debounce { + let edge = self.edge.expect("missing debounce edge"); + + // If we've only been given a maximum waiting time, this means we need to + // sample the stream at the interval given by max_wait instead of + // debouncing it. + let duration = match self.max_wait { + Some(max_wait) => match self.duration { + Some(dur) => dur, + + None => { + // Sampling on both edges leads to unexpected behavior where, when a + // sample interval elapses, two items will be returned. + assert!(edge != Edge::Both, "cannot sample on both edges"); + + // The actual duration added here doesn't matter, as long as its + // means the result is longer than `max_wait` and we have more than + // a millisecond (tokio timer precision). + max_wait + Duration::from_secs(1) + }, + }, + + None => self.duration.expect("missing debounce duration") + }; + + Debounce::new( + self.stream, + duration, + edge, + self.max_wait, + ) + } +} + +impl DebounceError { + /// Creates an error from the given stream error. + pub fn from_stream_error(err: T) -> Self { + DebounceError(Either::A(err)) + } + + /// Creates an error from the given timer error. + pub fn from_timer_error(err: Error) -> Self { + DebounceError(Either::B(err)) + } + + /// Gets the underlying stream error, if present. + pub fn get_stream_error(&self) -> Option<&T> { + match self.0 { + Either::A(ref err) => Some(err), + _ => None, + } + } + + /// Gets the underlying timer error, if present. + pub fn get_timer_error(&self) -> Option<&Error> { + match self.0 { + Either::B(ref err) => Some(err), + _ => None, + } + } + + /// Attempts to convert the error into the stream error. + pub fn into_stream_error(self) -> Option { + match self.0 { + Either::A(err) => Some(err), + _ => None, + } + } + + /// Attempts to convert the error into the timer error. + pub fn into_timer_error(self) -> Option { + match self.0 { + Either::B(err) => Some(err), + _ => None, + } + } + + /// Determines whether the underlying error is a stream error. + pub fn is_stream_error(&self) -> bool { + !self.is_timer_error() + } + + /// Determines whether the underlying error is an error within + /// tokio's timer machinery. + pub fn is_timer_error(&self) -> bool { + match self.0 { + Either::B(_) => true, + _ => false, + } + } +} + +impl Display for DebounceError { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self.0 { + Either::A(ref err) => write!(f, "stream error: {}", err), + Either::B(ref err) => write!(f, "timer error: {}", err), + } + } +} + +impl StdError for DebounceError { + fn description(&self) -> &str { + match self.0 { + Either::A(_) => "stream error", + Either::B(_) => "timer error", + } + } + + fn cause(&self) -> Option<&StdError> { + match self.0 { + Either::A(ref err) => Some(err), + Either::B(ref err) => Some(err), + } + } +} + +impl Edge { + /// The edge is either leading edge or both edges. + pub fn is_leading(&self) -> bool { + match self { + Edge::Leading | Edge::Both => true, + _ => false, + } + } + + /// The edge is either trailing edge or both edges. + pub fn is_trailing(&self) -> bool { + match self { + Edge::Trailing | Edge::Both => true, + _ => false + } + } +} diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs index e6d804922ea..6238adf435c 100644 --- a/tokio-timer/src/lib.rs +++ b/tokio-timer/src/lib.rs @@ -5,6 +5,9 @@ //! //! This crate provides a number of utilities for working with periods of time: //! +//! * [`Debounce`]: Wraps a stream, throwing items away until there has been at +//! least some amount of time without items having passed through. +//! //! * [`Delay`]: A future that completes at a specified instant in time. //! //! * [`Interval`] A stream that yields at fixed time intervals. @@ -24,6 +27,7 @@ //! //! [`Delay`]: struct.Delay.html //! [`Throttle`]: throttle/struct.Throttle.html +//! [`Debounce`]: debounce/struct.Debounce.html //! [`Timeout`]: struct.Timeout.html //! [`Interval`]: struct.Interval.html //! [`Timer`]: timer/struct.Timer.html @@ -36,6 +40,7 @@ extern crate futures; extern crate slab; pub mod clock; +pub mod debounce; pub mod delay_queue; pub mod throttle; pub mod timeout; diff --git a/tokio-timer/tests/debounce.rs b/tokio-timer/tests/debounce.rs new file mode 100644 index 00000000000..2b839d71b24 --- /dev/null +++ b/tokio-timer/tests/debounce.rs @@ -0,0 +1,175 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; +use support::*; + +use futures::{ + prelude::*, + sync::mpsc, +}; +use tokio::util::StreamExt; +use tokio_timer::{ + debounce::{Debounce, Edge}, + Timer, +}; + +#[test] +fn debounce_leading() { + mocked(|timer, _| { + let (debounced, tx) = make_debounced(Edge::Leading, None); + let items = smoke_tests(timer, tx, debounced); + + assert_eq!(items.len(), 1); + assert_eq!(items[0], 0); + }); +} + +#[test] +fn debounce_trailing_many() { + mocked(|timer, _| { + let (mut debounced, tx) = make_debounced(Edge::Trailing, None); + + // Send in two items. + tx.unbounded_send(1).unwrap(); + tx.unbounded_send(2).unwrap(); + + // We shouldn't be ready yet, but we should have stored 2 as our last item. + assert_not_ready!(debounced); + + // Go past our delay instant. + advance(timer, ms(11)); + + // Poll again, we should get 2. + assert_ready_eq!(debounced, Some(2)); + + // No more items in the stream, delay finished: we should be NotReady. + assert_not_ready!(debounced); + }); +} + +#[test] +fn debounce_trailing() { + mocked(|timer, _| { + let (debounced, tx) = make_debounced(Edge::Trailing, None); + let items = smoke_tests(timer, tx, debounced); + + assert_eq!(items.len(), 1); + assert_eq!(items[0], 4); + }); +} + +#[test] +fn debounce_both() { + mocked(|timer, _| { + let (debounced, tx) = make_debounced(Edge::Both, None); + let items = smoke_tests(timer, tx, debounced); + + assert_eq!(items.len(), 2); + assert_eq!(items[0], 0); + assert_eq!(items[1], 4); + }); +} + +#[test] +fn sample_leading() { + mocked(|timer, _| { + let (debounced, tx) = make_debounced(Edge::Leading, Some(3)); + let items = smoke_tests(timer, tx, debounced); + + assert_eq!(items.len(), 2); + assert_eq!(items[0], 0); + assert_eq!(items[1], 3); + }); +} + +#[test] +fn sample_trailing() { + mocked(|timer, _| { + let (debounced, tx) = make_debounced(Edge::Trailing, Some(3)); + let items = smoke_tests(timer, tx, debounced); + + assert_eq!(items.len(), 2); + assert_eq!(items[0], 2); + assert_eq!(items[1], 4); + }); +} + +#[test] +#[should_panic] +fn sample_both_panics() { + let (_, rx) = mpsc::unbounded::<()>(); + let _ = rx.debounce_builder() + .max_wait(ms(10)) + .edge(Edge::Both) + .build(); +} + +#[test] +fn combinator_debounce() { + let (_, rx1) = mpsc::unbounded::<()>(); + let _ = rx1.debounce(ms(100)); +} + +#[test] +fn combinator_sample() { + let (_, rx1) = mpsc::unbounded::<()>(); + let _ = rx1.sample(ms(100)); +} + +fn make_debounced( + edge: Edge, + max_wait: Option, +) -> (impl Stream, mpsc::UnboundedSender) { + let (tx, rx) = mpsc::unbounded(); + let debounced = Debounce::new( + rx, + ms(10), + edge, + max_wait.map(ms), + ) + .map_err(|e| panic!("stream error: {:?}", e)); + + (debounced, tx) +} + +fn smoke_tests( + timer: &mut Timer, + tx: mpsc::UnboundedSender, + mut s: impl Stream, +) -> Vec { + assert_not_ready!(s); + + let mut result = Vec::new(); + + // Drive forward 1ms at a time adding items to the stream + for i in 0..5 { + tx.unbounded_send(i).unwrap(); + advance(timer, ms(1)); + + match s.poll().unwrap() { + Async::Ready(Some(it)) => result.push(it), + Async::Ready(None) => break, + Async::NotReady => {}, + } + } + + // Pull final items out of stream + for _ in 0..100 { + match s.poll().unwrap() { + Async::Ready(Some(it)) => result.push(it), + Async::Ready(None) => break, + Async::NotReady => {}, + } + + advance(timer, ms(1)); + } + + advance(timer, ms(1000)); + + assert_not_ready!(s); + result +}