Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backpressure for window(size) #2820

Merged
merged 2 commits into from
Apr 30, 2015

Conversation

akarnokd
Copy link
Member

Changes to the window(size) operator to respect the backpressure on its outer Observable: asking for 1 window will request size values from upstream.

Backpressure is ignored on the inner Observable for now, partially because the BufferUntilSubscriber doesn't support it, partially because coordinating the inner requests with the outer requests needs more thoughts. The problem is that the outer has to request at least 1 element from upstream in order to open the window, but the inner subscriber may not want that single element just yet or would request more than the remaining window size and it would trigger new windows whose value it can't receive but would overflow the next window's observers.

window(size, skip) is not changed as I need to think about it more.

@DavidMGross
Copy link
Collaborator

Could you change the javadoc for this operator in Observable.java as well: http://reactivex.io/RxJava/javadoc/rx/Observable.html#window(int)

It currently reads:

Backpressure Support:
This operator does not support backpressure as it uses count to control data flow.

@akarnokd
Copy link
Member Author

Sure.

@benjchristensen
Copy link
Member

Isn't supporting backpressure on the inner effectively the same as 'onBackpressureBuffer'?

@akarnokd
Copy link
Member Author

Yes. BufferUntilSubscriber would need to be smartened to do proper backpressure.

@benjchristensen
Copy link
Member

What has happened recently causing tests to fail? I don't see any recent commits at should do it. Ideas?

@akarnokd
Copy link
Member Author

I have no idea. I thought it was my commit for testOnBackpressureDropWithAction but reverting that didn't fix the master. My best guess is that something changed in the Travis-CI environment; perhaps we don't get 3GB memory anymore (on Windows, the tests take up to 1GB of memory).

@benjchristensen
Copy link
Member

So to confirm, merging this would be adding back pressure on the outer but still need us to come back later and fix the inner?

@akarnokd
Copy link
Member Author

Yes.

child.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why not use BackpressureUtils here? Is it because you are just passing the value through rather than maintaining the AtomicLong in this class?

benjchristensen added a commit that referenced this pull request Apr 30, 2015
@benjchristensen benjchristensen merged commit f1bd2a9 into ReactiveX:1.x Apr 30, 2015
@benjchristensen benjchristensen mentioned this pull request Apr 30, 2015
@akarnokd akarnokd deleted the WindowWithSizeBackpressure branch September 9, 2015 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants