Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window Unsubscribes Early? #1546

Closed
benjchristensen opened this issue Aug 4, 2014 · 3 comments
Closed

Window Unsubscribes Early? #1546

benjchristensen opened this issue Aug 4, 2014 · 3 comments
Labels
Milestone

Comments

@benjchristensen
Copy link
Member

The following example should always have 10 items, including in the last window, but it non-deterministically has less, as if the unsubscribe from take is happening immediately and not letting it finish.

public class WindowExample {

    public static void main(String args[]) {
        // buffer 10 items at a time (using 999999999 to mark start of output)
        hotStream().window(10).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
    }

    /**
     * This is an artificial source to demonstrate an infinite stream that bursts intermittently
     */
    public static Observable<Integer> hotStream() {
        return Observable.create((Subscriber<? super Integer> s) -> {
            while (!s.isUnsubscribed()) {
                // burst some number of items
                for (int i = 0; i < Math.random() * 20; i++) {
                    s.onNext(i);
                }
                try {
                    // sleep for a random amount of time
                    // NOTE: Only using Thread.sleep here as an artificial demo.
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (Exception e) {
                    // do nothing
                }
            }
        }).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block
    }

}
@benjchristensen benjchristensen added this to the 1.0 milestone Aug 4, 2014
@benjchristensen benjchristensen modified the milestones: 1.x, 1.0 Sep 23, 2014
@benjchristensen
Copy link
Member Author

This is a bug, but it's been a bug for a while and doesn't affect public APIs so moving from 1.0 to 1.x milestone.

@benjchristensen benjchristensen modified the milestones: 1.x, 1.0.x, 1.0 Oct 7, 2014
@akarnokd
Copy link
Member

What happens is that take(10) shuts down the emitter thread as soon as it receives the 10th Observable so the downstream receives random amount of data before the emitter checks again its client for unsubscription. Changing it to buffer(10) gives the required 10 items and doesn't hang.

        hotStream().buffer(10).take(5).flatMap(w -> {
            return Observable.from(w).startWith(999999999);
        }).toBlocking().forEach(System.out::println);

@akarnokd
Copy link
Member

The hang happens because the flatMap never receives an onCompleted from the emitter and thus unable to release the latch in the blocking forEach.

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 6, 2014
Split the unit tests up to match the implementation files.
Add unit tests for ReactiveX#1546 to OperatorWindowWithSizeTest
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 6, 2014
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 6, 2014
- ReactiveX#1546
- This also fixes the fact that the overlapping window overload was not propagating unsubscribe before. A new unit test caught that.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants