Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BlockingOperatorToIterator exert backpressure. #3351

Merged
merged 2 commits into from
Oct 8, 2015

Conversation

vqvu
Copy link

@vqvu vqvu commented Sep 16, 2015

The iterator created by BlockingOperator#getIterator() doesn't exert backpressure, which causes code like this to never terminate/run out of memory

Observable.from(new Iterable<Integer>() {

    @Override
    public Iterator<Integer> iterator() {
        return new Iterator<Integer>() {

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public Integer next() {
                return 1;
            }
        };
    }
}).toBlocking().getIterator().next();

This PR adds the appropriate request calls so that this works. I had to combine the implementations of Subscriber and Iterator into a single class to get access to request().

private Notification<? extends T> buf;
public SubscriberIterator() {
this.notifications = new LinkedBlockingQueue<Notification<? extends T>>();
this.buf = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting variables to their default value on construction is a no-op semantically but still mean instructions executed which causes the construction of the object to take longer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised the compiler doesn't just optimize that out...but I can remove it.

@vqvu vqvu force-pushed the blocking-iterator-backpressure branch from 57e1223 to d1a8739 Compare September 16, 2015 08:22
@vqvu
Copy link
Author

vqvu commented Sep 16, 2015

Ok. I removed the unnecessary variable initialization and added batched requests. The initial batch is RxRingBuffer.SIZE, and subsequent ones are 3 * RxRingBuffer.SIZE / 4 each (once we've received that much).

@akarnokd
Copy link
Member

Excellent! 👍

@abersnaze
Copy link
Contributor

👍

abersnaze added a commit that referenced this pull request Oct 8, 2015
Make BlockingOperatorToIterator exert backpressure.
@abersnaze abersnaze merged commit a7ba04b into ReactiveX:1.x Oct 8, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants