Skip to content

Commit

Permalink
util: implement stream debounce combinator (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
NeoLegends authored and tobz committed Jan 5, 2019
1 parent a687922 commit 7a49ebb
Show file tree
Hide file tree
Showing 4 changed files with 647 additions and 1 deletion.
60 changes: 59 additions & 1 deletion src/util/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#[cfg(feature = "timer")]
use tokio_timer::{
debounce::{Debounce, DebounceBuilder, Edge},
throttle::Throttle,
Timeout,
timeout::Timeout,
};

use futures::Stream;
Expand All @@ -24,6 +25,63 @@ use std::time::Duration;
///
/// [`timeout`]: #method.timeout
pub trait StreamExt: Stream {
/// Debounce the stream on the trailing edge using the given duration.
///
/// Errors will pass through without being debounced. Debouncing will
/// happen on the trailing edge. This means all items (except the last
/// one) will be discarded until the delay has elapsed without an item
/// being passed through. The last item that was passed through will
/// be returned.
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
///
/// See also [`debounce_builder`], which allows more configuration over how the
/// debouncing is done.
///
/// [`debounce_builder`]: #method.debounce_builder
fn debounce(self, dur: Duration) -> Debounce<Self>
where Self: Sized
{
self.debounce_builder()
.duration(dur)
.edge(Edge::Trailing)
.build()
}

/// Create a builder that builds a debounced version of this stream.
///
/// The returned builder can be used to configure the debouncing process.
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
fn debounce_builder(self) -> DebounceBuilder<Self>
where Self: Sized
{
DebounceBuilder::from_stream(self)
}

/// Sample the stream at the given `interval`.
///
/// Sampling works similar to debouncing in that frequent values will be
/// ignored. Sampling, however, ensures that an item is passed through at
/// least after every `interval`. Debounce, on the other hand, would not
/// pass items through until there has been enough "silence".
///
/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the sampling implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
fn sample(self, interval: Duration) -> Debounce<Self>
where Self: Sized
{
self.debounce_builder()
.max_wait(interval)
.edge(Edge::Leading)
.build()
}

/// Throttle down the stream by enforcing a fixed delay between items.
///
/// Errors are also delayed.
Expand Down
Loading

0 comments on commit 7a49ebb

Please sign in to comment.