Skip to content

Commit

Permalink
Merge pull request ReactiveX#1833 from benjchristensen/window-tweaks
Browse files Browse the repository at this point in the history
Fix Thread Safety for Unsubscribe of Window
  • Loading branch information
benjchristensen committed Nov 8, 2014
2 parents 399cbf9 + 77a4f63 commit faa270c
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ final class ExactSubscriber extends Subscriber<T> {
final Subscriber<? super Observable<T>> child;
int count;
BufferUntilSubscriber<T> window;
Subscription parentSubscription = this;
volatile boolean noWindow = true;
final Subscription parentSubscription = this;
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
/**
* See https://github.com/ReactiveX/RxJava/issues/1546
Expand All @@ -77,7 +78,7 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
@Override
public void call() {
// if no window we unsubscribe up otherwise wait until window ends
if(window == null) {
if(noWindow) {
parentSubscription.unsubscribe();
}
}
Expand All @@ -94,13 +95,15 @@ public void onStart() {
@Override
public void onNext(T t) {
if (window == null) {
noWindow = false;
window = BufferUntilSubscriber.create();
child.onNext(window);
}
window.onNext(t);
if (++count % size == 0) {
window.onCompleted();
window = null;
noWindow = true;
if (child.isUnsubscribed()) {
parentSubscription.unsubscribe();
return;
Expand Down Expand Up @@ -130,7 +133,7 @@ final class InexactSubscriber extends Subscriber<T> {
final Subscriber<? super Observable<T>> child;
int count;
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
Subscription parentSubscription = this;
final Subscription parentSubscription = this;

public InexactSubscriber(Subscriber<? super Observable<T>> child) {
/**
Expand Down

0 comments on commit faa270c

Please sign in to comment.