Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream debounce combinator #747

Merged
merged 20 commits into from
Jan 5, 2019

Conversation

NeoLegends
Copy link
Contributor

Supersedes #732, I changed the branch name for better clarity. Original description below.


Coming back to rust-lang/futures-rs#210, I was now able to start integrating these combinators within tokio, thanks to the new runtime.

The code is still untested, documentation needs to be improved (you are really impressive with this, as a non-native speaker I sometimes have trouble finding the right words ;)), and I'd like to add at least one more combinator (throttle / sample that will slow down the source streams but periodically return items for e. g. better user experience), but I figured its probably better to notify you guys earlier than later.

I'm very eager on feedback on the implementation (specifically the big, nested match blocks). If it can be done more clearly / simpler I'll gladly improve the implementation. :) be as harsh as you‘d like

@NeoLegends NeoLegends force-pushed the feature/debounce branch 2 times, most recently from e38853f to 0b6a8e1 Compare November 11, 2018 22:41
@NeoLegends
Copy link
Contributor Author

I have refactored the implementation. I'm quite happy with the result now. Much less nesting and a simpler, unified implementation for both leading and trailing edge debouncing.

/// debouncing is done.
///
/// [`debounce_builder`]: #method.debounce_builder
fn debounce(self, dur: Duration) -> Debounce<Self>
Copy link
Contributor Author

@NeoLegends NeoLegends Nov 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this specialized combinator because I imagine this is the version how most people are going to use this.

We may even consider adding another combinator that sets just the max_wait in order to sample the underlying stream at a given interval. Right now, sampling functionality is integrated with debouncing, so the second combinator would be there just for convenience.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a sample combinator in e7784d3 because I believe the case is common as well. I'll revert the commit if you don't want it in. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be additional documentation describing what "debounce" means? Given the documentation, I am not sure what this combinator does.

/// Either the error of the underlying stream, or an error within tokio's
/// timing machinery.
#[derive(Debug)]
pub struct DebounceError<T>(Either<T, Error>);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Which edge the debounce tiggers on.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub enum Edge {
Copy link
Contributor Author

@NeoLegends NeoLegends Nov 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also use two flags here, but I figured this is more ergonomic and it even saves us a byte in the struct ;)

@NeoLegends NeoLegends force-pushed the feature/debounce branch 2 times, most recently from c468660 to d7c2a66 Compare November 14, 2018 09:22
@tobz
Copy link
Member

tobz commented Nov 18, 2018

@NeoLegends Any way you could add a simple test or two for this? Brand new combinator, and one that depends on time, feels like something we should try and test.

@NeoLegends
Copy link
Contributor Author

@tobz here we go (see also throttle PR)

@NeoLegends NeoLegends changed the title [WIP] Implement stream debounce combinator Implement stream debounce combinator Nov 19, 2018
@tobz
Copy link
Member

tobz commented Nov 20, 2018

@NeoLegends Wanna merge in master? We pushed a fix that should hopefully alleviate these stupid Windows test failures, and deal with the conflicts.

I'm still going over the actual review portion for the code itself.

@carllerche
Copy link
Member

Woops, looks like this conflicts with the throttle one :-/

@NeoLegends
Copy link
Contributor Author

NeoLegends commented Nov 20, 2018

Rebased on current master.

EDIT: Again, I forgot to resolve a conflict.

@tobz
Copy link
Member

tobz commented Nov 28, 2018

I'll take another look at this today or tomorrow hopefully.

tobz
tobz previously requested changes Dec 1, 2018
Copy link
Member

@tobz tobz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some initial feedback, both about nit type things and a bug I believe I found.

Most of it looks good, but I think this might need another pass once that bug is taken care of.

src/util/stream.rs Outdated Show resolved Hide resolved
src/util/stream.rs Outdated Show resolved Hide resolved
tokio-timer/src/debounce.rs Outdated Show resolved Hide resolved
tokio-timer/src/debounce.rs Show resolved Hide resolved
tokio-timer/src/debounce.rs Outdated Show resolved Hide resolved
d.reset(self.delay_time());

self.delay = Some(d);
self.last_item = Some(item);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a slight bug here, bear with me:

  • our stream has an item, and we start a delay
  • if we're in trailing mode, we return NotReady so the caller doesn't immediately call poll again
  • the delay eventually fires, and we return the initial item that triggered us

If any items come in between us returning NotReady the first time, and the delay firing, they would never trigger us and so we'd actually miss consuming an item that came in during the waiting period. This test code exercises the edge case:

#[test]
fn debounce_trailing_many() {
    mocked(|timer, _| {
        let (mut debounced, tx) = make_debounced(Edge::Trailing, None);

        // Send in two items.
        tx.unbounded_send(1).unwrap();
        tx.unbounded_send(2).unwrap();

        // We shouldn't be ready yet, but we should have stored 2 as our last item.
        assert_not_ready!(debounced);

        // Go past our delay instant.
        advance(timer, ms(11));

        // Poll again, we should get 2.
        assert_ready_eq!(debounced, Some(2));

        // No more items in the stream, delay finished: we should be NotReady.
        assert_not_ready!(debounced);
    });
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We obviously wouldn't lose the item, but given that debouncing works in the context of wall clock time, we wouldn't correctly be debouncing in this edge case.

Copy link
Contributor Author

@NeoLegends NeoLegends Dec 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the problem is that we aren't being woken up "correctly" here. Since there is no polling between the first and the second item, there is no processing transitioning us from storing the first to storing the second.

However, if you add a

assert_not_ready!(debounced);

between pushing the first and the second item into the stream, everything works out as expected, because the debouncer has the chance to process the new item.

I wonder how we can solve this, and even if we need to solve this. To me, this seems more like an architectural issue (or quirk) with pull-streams rather than with this implementation. In a real-world-application one would have the debouncer running on an executor, which should wake it up two times for both items being sent through, shouldn't it? Then we'd have no problems.

I mean, we could make the polling recursive in some cases, i. e. poll ourselves again after we have started the delay and returned NotReady. That would solve this specific case (with a finite amount of elements buffered), but obviously lead to issues with streams that are an infinite source of items like stream::repeat().

Any other ideas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, technically, you need to poll streams until they return NotReady. That's sort of the contract with consumers. When a stream is polled, and it has no more items to give, most implementations will store a handle to the current task and then, when a new item comes in, they notify that task so it can poll again for items.

At least for the standard streams like an mpsc::Receiver, it follows that contract. That's why polling until NotReady is generally important.

I think it depends on what behavior we want to expose here. If our idea of debouncing is tied to wall time, for example, if I send the first item at t=0, and we want to debounce until t=10, then items sent between 0-10 should wake me up and affect the debounce... so we always need to poll the underlying stream until NotReady to have it continue to wake us up when it has another item to give after being empty.

If we don't care about that, then we could leave it as is... although we'd end up with more of a token bucket sort of thing, I think. Since we only grab one item max before triggering the debounce delay, if there was a burst of N items, we'd actually end up debouncing N times, which depending on your duration, could be a really drawn out thing.

To me, that seems like it's not correct, but I might be looking at the intended behavior the wrong way.

As far as something like stream::repeat, yeah, no great answer there. Although, even with push streams, we would just endlessly be pushing as fast as possible, so one way or another, stream::repeat is a weird degenerate case that I'm not sure we could ever handle. max_wait could help us, but we'd be waking up or spinning endlessly whether it's push vs pull.

Copy link
Contributor Author

@NeoLegends NeoLegends Dec 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't care about that, then we could leave it as is... although we'd end up with more of a token bucket sort of thing, I think. Since we only grab one item max before triggering the debounce delay, if there was a burst of N items, we'd actually end up debouncing N times, which depending on your duration, could be a really drawn out thing.

Doesn't this depend on how often we're woken up? Wouldn't any regular implementation wake us up N times as well?

At least for the standard streams like an mpsc::Receiver, it follows that contract. That's why polling until NotReady is generally important.

Am I reading this correctly that sending into an mpsc::Receiver N times does not wake the stream up N times?

I think it depends on what behavior we want to expose here. If our idea of debouncing is tied to wall time, for example, if I send the first item at t=0, and we want to debounce until t=10, then items sent between 0-10 should wake me up and affect the debounce... so we always need to poll the underlying stream until NotReady to have it continue to wake us up when it has another item to give after being empty.

I think we should go for wall clock time, i. e. change the impl to poll until we see NotReady.

As far as something like stream::repeat, yeah, no great answer there. Although, even with push streams, we would just endlessly be pushing as fast as possible, so one way or another, stream::repeat is a weird degenerate case that I'm not sure we could ever handle. max_wait could help us, but we'd be waking up or spinning endlessly whether it's push vs pull.

We could document that the underlying stream needs to be NotReady at least some times, that'd be okay I think.

@carllerche what's your opinion on this?

Copy link
Member

@tobz tobz Dec 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this correctly that sending into an mpsc::Receiver N times does not wake the stream up N times?

Yes. You can roughly see it in action here: https://docs.rs/futures/0.1.25/src/futures/sync/mpsc/mod.rs.html#931

So, to answer the question before that: no, sending N items in this case will not result in N wakeups to the task driving the debounce. If we polled until NotReady, and, then we may get up to N wakeups, but would at least be consuming all N items.

If there's an available message, it's returned. If there's no message available, the task (the one receiving) is parked and notified when an item is sent in. So, if you don't drain the receiver until NotReady is reached, the receiver task is never scheduled to get a wakeup if an item is sent.

I think we should go for wall clock time, i. e. change the impl to poll until we see NotReady.

Makes sense to me.

We could document that the underlying stream needs to be NotReady at least some times, that'd be okay I think.

That sounds ideal to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@NeoLegends
Copy link
Contributor Author

Thanks for the feedback @tobz btw. :)

@tobz tobz dismissed their stale review December 3, 2018 00:06

no longer relevant

@tobz
Copy link
Member

tobz commented Dec 3, 2018

I took another look. After thinking through it, I believe the recursive self.poll() call should work.

I'm gonna wait until someone else can try and take a look at this. I appreciate all the time you've spent, and on addressing my feedback, but I'm worried that even I'm getting confused and potentially missing subtle corner cases where the combinator might behave unexpectedly.

Copy link
Member

@tobz tobz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna approve/merge this.

It's been sitting around wayyyy too long -- my apologies -- and I think even if we discover a small bug or any refactoring to do, it'll still be providing value and the API is solid enough to not require changes.

Thanks again for all of your work on this.

@tobz tobz merged commit 7a49ebb into tokio-rs:master Jan 5, 2019
@NeoLegends
Copy link
Contributor Author

NeoLegends commented Jan 5, 2019

Great! Thanks for merging and your feedback. I certainly learned a lot about the Poll<T, E>-contract.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I provided some additional thoughts. Ideally they could be fixed in a follow up.

/// debouncing is done.
///
/// [`debounce_builder`]: #method.debounce_builder
fn debounce(self, dur: Duration) -> Debounce<Self>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be additional documentation describing what "debounce" means? Given the documentation, I am not sure what this combinator does.

@@ -0,0 +1,408 @@
//! Debounce streams on the leading or trailing edge or both edges for a certain
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation should not be in tokio-timer. The `tokio-timer crate should be reserved for foundational types that either cannot be implemented outside of the crate or have trivial / obvious implementations.

I do not believe debounce satisfies this, so it should live in tokio proper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be fine with the tests remaining in tokio_timer? Currently they require some code from tokio_timer::tests::support to run.


/// Builds a debouncing stream.
#[derive(Debug)]
pub struct DebounceBuilder<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be named Builder and be referred to debounce::Builder. In general, idiomatic Rust tries to avoid stuttering (debounce::DebounceBuilder).

/// Care must be taken that this stream returns `Async::NotReady` at some point,
/// otherwise the debouncing implementation will overflow the stack during
/// `.poll()` (i. e. don't use this directly on `stream::repeat`).
fn debounce_builder(self) -> DebounceBuilder<Self>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably shouldn't be a trait fn. The builder can be constructed directly with Builder::new.

@NeoLegends NeoLegends deleted the feature/debounce branch January 6, 2019 13:07
@NeoLegends
Copy link
Contributor Author

Thanks @carllerche for your feedback. I will address the issues in a follow-up PR.

carllerche added a commit that referenced this pull request Jan 6, 2019
carllerche added a commit that referenced this pull request Jan 7, 2019
This reverts commit 7a49ebb.

The commit conflicted with another change that was merged, causing CI to fail. The public API
also requires a bit more refinement (#833) and Tokio crates need to be released.
@joaomoreno
Copy link

@NeoLegends @carllerche This got reverted, unfortunately. Any plans to bring it back?

@NeoLegends
Copy link
Contributor Author

Yes, I have. I currently have a use-case for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants