diff --git a/src/util/stream.rs b/src/util/stream.rs index 158e28d5213..76e6c27e05e 100644 --- a/src/util/stream.rs +++ b/src/util/stream.rs @@ -1,8 +1,7 @@ #[cfg(feature = "timer")] use tokio_timer::{ - debounce::{Debounce, DebounceBuilder, Edge}, throttle::Throttle, - timeout::Timeout, + Timeout, }; use futures::Stream; @@ -25,63 +24,6 @@ use std::time::Duration; /// /// [`timeout`]: #method.timeout pub trait StreamExt: Stream { - /// Debounce the stream on the trailing edge using the given duration. - /// - /// Errors will pass through without being debounced. Debouncing will - /// happen on the trailing edge. This means all items (except the last - /// one) will be discarded until the delay has elapsed without an item - /// being passed through. The last item that was passed through will - /// be returned. - /// - /// Care must be taken that this stream returns `Async::NotReady` at some point, - /// otherwise the debouncing implementation will overflow the stack during - /// `.poll()` (i. e. don't use this directly on `stream::repeat`). - /// - /// See also [`debounce_builder`], which allows more configuration over how the - /// debouncing is done. - /// - /// [`debounce_builder`]: #method.debounce_builder - fn debounce(self, dur: Duration) -> Debounce - where Self: Sized - { - self.debounce_builder() - .duration(dur) - .edge(Edge::Trailing) - .build() - } - - /// Create a builder that builds a debounced version of this stream. - /// - /// The returned builder can be used to configure the debouncing process. - /// - /// Care must be taken that this stream returns `Async::NotReady` at some point, - /// otherwise the debouncing implementation will overflow the stack during - /// `.poll()` (i. e. don't use this directly on `stream::repeat`). - fn debounce_builder(self) -> DebounceBuilder - where Self: Sized - { - DebounceBuilder::from_stream(self) - } - - /// Sample the stream at the given `interval`. - /// - /// Sampling works similar to debouncing in that frequent values will be - /// ignored. Sampling, however, ensures that an item is passed through at - /// least after every `interval`. Debounce, on the other hand, would not - /// pass items through until there has been enough "silence". - /// - /// Care must be taken that this stream returns `Async::NotReady` at some point, - /// otherwise the sampling implementation will overflow the stack during - /// `.poll()` (i. e. don't use this directly on `stream::repeat`). - fn sample(self, interval: Duration) -> Debounce - where Self: Sized - { - self.debounce_builder() - .max_wait(interval) - .edge(Edge::Leading) - .build() - } - /// Throttle down the stream by enforcing a fixed delay between items. /// /// Errors are also delayed. diff --git a/tokio-timer/src/debounce.rs b/tokio-timer/src/debounce.rs deleted file mode 100644 index 5c9bdd79ed8..00000000000 --- a/tokio-timer/src/debounce.rs +++ /dev/null @@ -1,408 +0,0 @@ -//! Debounce streams on the leading or trailing edge or both edges for a certain -//! amount of time. -//! -//! See [`Debounce`] for more details. -//! -//! [`Debounce`]: struct.Debounce.html - -use {clock, Delay, Error}; - -use futures::{ - future::Either, - prelude::*, -}; -use std::{ - cmp, - error::Error as StdError, - fmt::{Display, Formatter, Result as FmtResult}, - time::{Duration, Instant}, -}; - -/// Debounce streams on the leading or trailing edge or both. -/// -/// Useful for slowing processing of e. g. user input or network events -/// to a bearable rate. -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Debounce { - delay: Option, - duration: Duration, - edge: Edge, - last_item: Option, - max_wait: Option, - max_wait_to: Option, - stream: T, -} - -/// Builds a debouncing stream. -#[derive(Debug)] -pub struct DebounceBuilder { - duration: Option, - edge: Option, - max_wait: Option, - stream: T, -} - -/// Either the error of the underlying stream, or an error within tokio's -/// timing machinery. -#[derive(Debug)] -pub struct DebounceError(Either); - -/// Which edge the debounce tiggers on. -#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub enum Edge { - /// The debounce triggers on the leading edge. - /// - /// The first stream item will be returned immediately and subsequent ones - /// will be ignored until the delay has elapsed without items passing through. - Leading, - - /// The debounce triggers on the trailing edge. - /// - /// All items (except the last one) are thrown away until the delay has elapsed - /// without items passing through. The last item is returned. - Trailing, - - /// The debounce triggers on both the leading and the trailing edge. - /// - /// The first and the last items will be returned. - /// - /// Note that trailing edge behavior will only be visible if the underlying - /// stream fires at least twice during the debouncing period. - Both, -} - -impl Debounce { - /// Constructs a new stream that debounces the items passed through. - /// - /// Care must be taken that `stream` returns `Async::NotReady` at some point, - /// otherwise the debouncing implementation will overflow the stack during - /// `.poll()` (i. e. don't use this directly on `stream::repeat`). - pub fn new( - stream: T, - duration: Duration, - edge: Edge, - max_wait: Option, - ) -> Self { - Self { - delay: None, - duration, - edge, - last_item: None, - max_wait, - max_wait_to: None, - stream, - } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &T { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this combinator - /// is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the stream - /// which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut T { - &mut self.stream - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so care - /// should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> T { - self.stream - } - - /// Computes the instant at which the next debounce delay elapses. - fn delay_time(&mut self) -> Instant { - let next = clock::now() + self.duration; - - if let Some(to) = self.max_wait_to { - cmp::min(next, to) - } else { - next - } - } - - /// Polls the underlying delay future. - fn poll_delay(d: &mut Delay) -> Poll<(), ::Error> { - d.poll().map_err(DebounceError::from_timer_error) - } - - /// Polls the underlying stream. - fn poll_stream( - &mut self, - ) -> Poll::Item>, ::Error> { - self.stream.poll().map_err(DebounceError::from_stream_error) - } - - /// Starts a new delay using the current duration and maximum waiting time. - fn start_delay(&mut self) { - self.max_wait_to = self.max_wait.map(|dur| clock::now() + dur); - self.delay = Some(Delay::new(self.delay_time())); - } -} - -impl Stream for Debounce { - type Item = T::Item; - type Error = DebounceError; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.delay.take() { - Some(mut d) => match Self::poll_delay(&mut d)? { - // Delay has woken us up and is over, if we're trailing edge this - // means we need to return the last item. - Async::Ready(_) => { - if self.edge.is_trailing() { - if let Some(item) = self.last_item.take() { - return Ok(Async::Ready(Some(item))); - } - } - - return Ok(Async::NotReady); - }, - - // The stream has woken us up, but we have a delay. - Async::NotReady => match self.poll_stream()? { - // We have gotten an item, but we're currently blocked on - // the delay. Save it for later and reset the timer. - Async::Ready(Some(item)) => { - d.reset(self.delay_time()); - - self.delay = Some(d); - self.last_item = Some(item); - - self.poll() - }, - - // The stream has ended. Communicate this immediately to the - // following stream. - Async::Ready(None) => Ok(Async::Ready(None)), - - Async::NotReady => { - self.delay = Some(d); - Ok(Async::NotReady) - }, - }, - }, - - None => match try_ready!(self.poll_stream()) { - // We have gotten an item. Set up the delay for future items to be - // debounced. If we're on leading edge, return the item, otherwise - // save it for later. - Some(item) => { - self.start_delay(); - - if self.edge.is_leading() { - Ok(Async::Ready(Some(item))) - } else { - self.last_item = Some(item); - - self.poll() - } - }, - - // The stream has ended. Communicate this immediately to the - // following stream. - None => Ok(Async::Ready(None)), - }, - } - } -} - -impl DebounceBuilder { - /// Creates a new builder from the given debounce stream. - /// - /// Care must be taken that `stream` returns `Async::NotReady` at some point, - /// otherwise the debouncing implementation will overflow the stack during - /// `.poll()` (i. e. don't use this directly on `stream::repeat`). - pub fn from_stream(stream: T) -> Self { - DebounceBuilder { - duration: None, - edge: None, - max_wait: None, - stream: stream, - } - } - - /// Sets the duration to debounce to. - /// - /// If no duration is set here but [`max_wait`] is given instead, the resulting - /// stream will sample the underlying stream at the interval given by - /// [`max_wait`] instead of debouncing it. - /// - /// [`max_wait`]: #method.max_wait - pub fn duration(mut self, dur: Duration) -> Self { - self.duration = Some(dur); - self - } - - /// Sets the debouncing edge. - /// - /// An edge MUST be set before trying to [`build`] the debounce stream. - /// - /// [`build`]: #method.build - pub fn edge(mut self, edge: Edge) -> Self { - self.edge = Some(edge); - self - } - - /// Sets the maximum waiting time. - /// - /// If only a `max_wait` is given (and no [`duration`]), the resulting stream - /// will sample the underlying stream at the interval given by `max_wait` - /// instead of debouncing it. - /// Sampling cannot occur on both edges. Trying to build a sampling stream - /// on both edges will panic. - /// - /// [`duration`]: #method.duration - pub fn max_wait(mut self, max_wait: Duration) -> Self { - self.max_wait = Some(max_wait); - self - } -} - -impl DebounceBuilder { - /// Builds the debouncing stream. - /// - /// Panics if the edge or the duration is unspecified, or if only `max_wait` - /// together with `Edge::Both` was specified. - pub fn build(self) -> Debounce { - let edge = self.edge.expect("missing debounce edge"); - - // If we've only been given a maximum waiting time, this means we need to - // sample the stream at the interval given by max_wait instead of - // debouncing it. - let duration = match self.max_wait { - Some(max_wait) => match self.duration { - Some(dur) => dur, - - None => { - // Sampling on both edges leads to unexpected behavior where, when a - // sample interval elapses, two items will be returned. - assert!(edge != Edge::Both, "cannot sample on both edges"); - - // The actual duration added here doesn't matter, as long as its - // means the result is longer than `max_wait` and we have more than - // a millisecond (tokio timer precision). - max_wait + Duration::from_secs(1) - }, - }, - - None => self.duration.expect("missing debounce duration") - }; - - Debounce::new( - self.stream, - duration, - edge, - self.max_wait, - ) - } -} - -impl DebounceError { - /// Creates an error from the given stream error. - pub fn from_stream_error(err: T) -> Self { - DebounceError(Either::A(err)) - } - - /// Creates an error from the given timer error. - pub fn from_timer_error(err: Error) -> Self { - DebounceError(Either::B(err)) - } - - /// Gets the underlying stream error, if present. - pub fn get_stream_error(&self) -> Option<&T> { - match self.0 { - Either::A(ref err) => Some(err), - _ => None, - } - } - - /// Gets the underlying timer error, if present. - pub fn get_timer_error(&self) -> Option<&Error> { - match self.0 { - Either::B(ref err) => Some(err), - _ => None, - } - } - - /// Attempts to convert the error into the stream error. - pub fn into_stream_error(self) -> Option { - match self.0 { - Either::A(err) => Some(err), - _ => None, - } - } - - /// Attempts to convert the error into the timer error. - pub fn into_timer_error(self) -> Option { - match self.0 { - Either::B(err) => Some(err), - _ => None, - } - } - - /// Determines whether the underlying error is a stream error. - pub fn is_stream_error(&self) -> bool { - !self.is_timer_error() - } - - /// Determines whether the underlying error is an error within - /// tokio's timer machinery. - pub fn is_timer_error(&self) -> bool { - match self.0 { - Either::B(_) => true, - _ => false, - } - } -} - -impl Display for DebounceError { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - match self.0 { - Either::A(ref err) => write!(f, "stream error: {}", err), - Either::B(ref err) => write!(f, "timer error: {}", err), - } - } -} - -impl StdError for DebounceError { - fn description(&self) -> &str { - match self.0 { - Either::A(_) => "stream error", - Either::B(_) => "timer error", - } - } - - fn cause(&self) -> Option<&StdError> { - match self.0 { - Either::A(ref err) => Some(err), - Either::B(ref err) => Some(err), - } - } -} - -impl Edge { - /// The edge is either leading edge or both edges. - pub fn is_leading(&self) -> bool { - match self { - Edge::Leading | Edge::Both => true, - _ => false, - } - } - - /// The edge is either trailing edge or both edges. - pub fn is_trailing(&self) -> bool { - match self { - Edge::Trailing | Edge::Both => true, - _ => false - } - } -} diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs index 6238adf435c..e6d804922ea 100644 --- a/tokio-timer/src/lib.rs +++ b/tokio-timer/src/lib.rs @@ -5,9 +5,6 @@ //! //! This crate provides a number of utilities for working with periods of time: //! -//! * [`Debounce`]: Wraps a stream, throwing items away until there has been at -//! least some amount of time without items having passed through. -//! //! * [`Delay`]: A future that completes at a specified instant in time. //! //! * [`Interval`] A stream that yields at fixed time intervals. @@ -27,7 +24,6 @@ //! //! [`Delay`]: struct.Delay.html //! [`Throttle`]: throttle/struct.Throttle.html -//! [`Debounce`]: debounce/struct.Debounce.html //! [`Timeout`]: struct.Timeout.html //! [`Interval`]: struct.Interval.html //! [`Timer`]: timer/struct.Timer.html @@ -40,7 +36,6 @@ extern crate futures; extern crate slab; pub mod clock; -pub mod debounce; pub mod delay_queue; pub mod throttle; pub mod timeout; diff --git a/tokio-timer/tests/debounce.rs b/tokio-timer/tests/debounce.rs deleted file mode 100644 index 2b839d71b24..00000000000 --- a/tokio-timer/tests/debounce.rs +++ /dev/null @@ -1,175 +0,0 @@ -extern crate futures; -extern crate tokio; -extern crate tokio_executor; -extern crate tokio_timer; - -#[macro_use] -mod support; -use support::*; - -use futures::{ - prelude::*, - sync::mpsc, -}; -use tokio::util::StreamExt; -use tokio_timer::{ - debounce::{Debounce, Edge}, - Timer, -}; - -#[test] -fn debounce_leading() { - mocked(|timer, _| { - let (debounced, tx) = make_debounced(Edge::Leading, None); - let items = smoke_tests(timer, tx, debounced); - - assert_eq!(items.len(), 1); - assert_eq!(items[0], 0); - }); -} - -#[test] -fn debounce_trailing_many() { - mocked(|timer, _| { - let (mut debounced, tx) = make_debounced(Edge::Trailing, None); - - // Send in two items. - tx.unbounded_send(1).unwrap(); - tx.unbounded_send(2).unwrap(); - - // We shouldn't be ready yet, but we should have stored 2 as our last item. - assert_not_ready!(debounced); - - // Go past our delay instant. - advance(timer, ms(11)); - - // Poll again, we should get 2. - assert_ready_eq!(debounced, Some(2)); - - // No more items in the stream, delay finished: we should be NotReady. - assert_not_ready!(debounced); - }); -} - -#[test] -fn debounce_trailing() { - mocked(|timer, _| { - let (debounced, tx) = make_debounced(Edge::Trailing, None); - let items = smoke_tests(timer, tx, debounced); - - assert_eq!(items.len(), 1); - assert_eq!(items[0], 4); - }); -} - -#[test] -fn debounce_both() { - mocked(|timer, _| { - let (debounced, tx) = make_debounced(Edge::Both, None); - let items = smoke_tests(timer, tx, debounced); - - assert_eq!(items.len(), 2); - assert_eq!(items[0], 0); - assert_eq!(items[1], 4); - }); -} - -#[test] -fn sample_leading() { - mocked(|timer, _| { - let (debounced, tx) = make_debounced(Edge::Leading, Some(3)); - let items = smoke_tests(timer, tx, debounced); - - assert_eq!(items.len(), 2); - assert_eq!(items[0], 0); - assert_eq!(items[1], 3); - }); -} - -#[test] -fn sample_trailing() { - mocked(|timer, _| { - let (debounced, tx) = make_debounced(Edge::Trailing, Some(3)); - let items = smoke_tests(timer, tx, debounced); - - assert_eq!(items.len(), 2); - assert_eq!(items[0], 2); - assert_eq!(items[1], 4); - }); -} - -#[test] -#[should_panic] -fn sample_both_panics() { - let (_, rx) = mpsc::unbounded::<()>(); - let _ = rx.debounce_builder() - .max_wait(ms(10)) - .edge(Edge::Both) - .build(); -} - -#[test] -fn combinator_debounce() { - let (_, rx1) = mpsc::unbounded::<()>(); - let _ = rx1.debounce(ms(100)); -} - -#[test] -fn combinator_sample() { - let (_, rx1) = mpsc::unbounded::<()>(); - let _ = rx1.sample(ms(100)); -} - -fn make_debounced( - edge: Edge, - max_wait: Option, -) -> (impl Stream, mpsc::UnboundedSender) { - let (tx, rx) = mpsc::unbounded(); - let debounced = Debounce::new( - rx, - ms(10), - edge, - max_wait.map(ms), - ) - .map_err(|e| panic!("stream error: {:?}", e)); - - (debounced, tx) -} - -fn smoke_tests( - timer: &mut Timer, - tx: mpsc::UnboundedSender, - mut s: impl Stream, -) -> Vec { - assert_not_ready!(s); - - let mut result = Vec::new(); - - // Drive forward 1ms at a time adding items to the stream - for i in 0..5 { - tx.unbounded_send(i).unwrap(); - advance(timer, ms(1)); - - match s.poll().unwrap() { - Async::Ready(Some(it)) => result.push(it), - Async::Ready(None) => break, - Async::NotReady => {}, - } - } - - // Pull final items out of stream - for _ in 0..100 { - match s.poll().unwrap() { - Async::Ready(Some(it)) => result.push(it), - Async::Ready(None) => break, - Async::NotReady => {}, - } - - advance(timer, ms(1)); - } - - advance(timer, ms(1000)); - - assert_not_ready!(s); - result -}