Skip to content

Commit

Permalink
Merge pull request #1829 from benjchristensen/1546-window
Browse files Browse the repository at this point in the history
Fix Window by Count Unsubscribe Behavior
  • Loading branch information
benjchristensen committed Nov 6, 2014
2 parents fddc923 + 71b4e7a commit 18d6f3e
Show file tree
Hide file tree
Showing 6 changed files with 878 additions and 570 deletions.
97 changes: 74 additions & 23 deletions src/main/java/rx/internal/operators/OperatorWindowWithSize.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
import rx.Observer;
import rx.Subscriber;

Expand Down Expand Up @@ -56,11 +60,29 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
final class ExactSubscriber extends Subscriber<T> {
final Subscriber<? super Observable<T>> child;
int count;
Observer<T> consumer;
Observable<T> producer;
BufferUntilSubscriber<T> window;
Subscription parentSubscription = this;
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
super(child);
/**
* See https://github.com/ReactiveX/RxJava/issues/1546
* We cannot compose through a Subscription because unsubscribing
* applies to the outer, not the inner.
*/
this.child = child;
/*
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
*/
child.add(Subscriptions.create(new Action0() {

@Override
public void call() {
// if no window we unsubscribe up otherwise wait until window ends
if(window == null) {
parentSubscription.unsubscribe();
}
}

}));
}

@Override
Expand All @@ -71,45 +93,66 @@ public void onStart() {

@Override
public void onNext(T t) {
if (count++ % size == 0) {
if (consumer != null) {
consumer.onCompleted();
if (window == null) {
window = BufferUntilSubscriber.create();
child.onNext(window);
}
window.onNext(t);
if (++count % size == 0) {
window.onCompleted();
window = null;
if (child.isUnsubscribed()) {
parentSubscription.unsubscribe();
return;
}
createNewWindow();
child.onNext(producer);
}
consumer.onNext(t);
}

@Override
public void onError(Throwable e) {
if (consumer != null) {
consumer.onError(e);
if (window != null) {
window.onError(e);
}
child.onError(e);
}

@Override
public void onCompleted() {
if (consumer != null) {
consumer.onCompleted();
if (window != null) {
window.onCompleted();
}
child.onCompleted();
}
void createNewWindow() {
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
consumer = bus;
producer = bus;
}
}

/** Subscriber with inexact, possibly overlapping or skipping windows. */
final class InexactSubscriber extends Subscriber<T> {
final Subscriber<? super Observable<T>> child;
int count;
final List<CountedSubject<T>> chunks;
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
Subscription parentSubscription = this;

public InexactSubscriber(Subscriber<? super Observable<T>> child) {
/**
* See https://github.com/ReactiveX/RxJava/issues/1546
* We cannot compose through a Subscription because unsubscribing
* applies to the outer, not the inner.
*/
this.child = child;
this.chunks = new LinkedList<CountedSubject<T>>();
/*
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
*/
child.add(Subscriptions.create(new Action0() {

@Override
public void call() {
// if no window we unsubscribe up otherwise wait until window ends
if (chunks == null || chunks.size() == 0) {
parentSubscription.unsubscribe();
}
}

}));
}

@Override
Expand All @@ -121,10 +164,13 @@ public void onStart() {
@Override
public void onNext(T t) {
if (count++ % skip == 0) {
CountedSubject<T> cs = createCountedSubject();
chunks.add(cs);
child.onNext(cs.producer);
if (!child.isUnsubscribed()) {
CountedSubject<T> cs = createCountedSubject();
chunks.add(cs);
child.onNext(cs.producer);
}
}

Iterator<CountedSubject<T>> it = chunks.iterator();
while (it.hasNext()) {
CountedSubject<T> cs = it.next();
Expand All @@ -134,6 +180,10 @@ public void onNext(T t) {
cs.consumer.onCompleted();
}
}
if (chunks.size() == 0 && child.isUnsubscribed()) {
parentSubscription.unsubscribe();
return;
}
}

@Override
Expand All @@ -155,6 +205,7 @@ public void onCompleted() {
}
child.onCompleted();
}

CountedSubject<T> createCountedSubject() {
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
return new CountedSubject<T>(bus, bus);
Expand Down
Loading

0 comments on commit 18d6f3e

Please sign in to comment.