Skip to content

Commit

Permalink
Merge pull request #745 from zsxwing/issue-737
Browse files Browse the repository at this point in the history
Fixed issue #737
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents 57376d0 + 5fe7d1d commit 68d7600
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
41 changes: 25 additions & 16 deletions rxjava-core/src/main/java/rx/operators/OperationSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Func1;

/**
Expand Down Expand Up @@ -62,8 +62,7 @@ public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription parent;
parent = new SafeObservableSubscription();

MultipleAssignmentSubscription child;
child = new MultipleAssignmentSubscription();
SerialSubscription child = new SerialSubscription();

parent.wrap(sequences.subscribe(new SwitchObserver<T>(observer, parent, child)));

Expand All @@ -76,13 +75,13 @@ private static class SwitchObserver<T> implements Observer<Observable<? extends
private final Object gate;
private final Observer<? super T> observer;
private final SafeObservableSubscription parent;
private final MultipleAssignmentSubscription child;
private final SerialSubscription child;
private long latest;
private boolean stopped;
private boolean hasLatest;

public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent,
MultipleAssignmentSubscription child) {
SerialSubscription child) {
this.observer = observer;
this.parent = parent;
this.child = child;
Expand All @@ -97,8 +96,7 @@ public void onNext(Observable<? extends T> args) {
this.hasLatest = true;
}

final SafeObservableSubscription sub;
sub = new SafeObservableSubscription();
final SafeObservableSubscription sub = new SafeObservableSubscription();
sub.wrap(args.subscribe(new Observer<T>() {
@Override
public void onNext(T args) {
Expand All @@ -111,28 +109,35 @@ public void onNext(T args) {

@Override
public void onError(Throwable e) {
sub.unsubscribe();
SafeObservableSubscription s = null;
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.observer.onError(e);
SwitchObserver.this.parent.unsubscribe();
s = SwitchObserver.this.parent;
}
}
if(s != null) {
s.unsubscribe();
}
}

@Override
public void onCompleted() {
sub.unsubscribe();
SafeObservableSubscription s = null;
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.hasLatest = false;
}

if (stopped) {
SwitchObserver.this.observer.onCompleted();
SwitchObserver.this.parent.unsubscribe();
if (stopped) {
SwitchObserver.this.observer.onCompleted();
s = SwitchObserver.this.parent;
}
}

}
if(s != null) {
s.unsubscribe();
}
}

Expand All @@ -152,13 +157,17 @@ public void onError(Throwable e) {

@Override
public void onCompleted() {
SafeObservableSubscription s = null;
synchronized (gate) {
this.stopped = true;
if (!this.hasLatest) {
this.observer.onCompleted();
this.parent.unsubscribe();
s = this.parent;
}
}
if(s != null) {
s.unsubscribe();
}
}

}
Expand Down
47 changes: 47 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,51 @@ public void call() {
@SuppressWarnings("serial")
private class TestException extends Throwable {
}

@Test
public void testSwitchIssue737() {
// https://github.com/Netflix/RxJava/issues/737
Observable<Observable<String>> source = Observable.create(new Observable.OnSubscribeFunc<Observable<String>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 0, Observable.create(new Observable.OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 10, "1-one");
publishNext(observer, 20, "1-two");
// The following events will be ignored
publishNext(observer, 30, "1-three");
publishCompleted(observer, 40);
return Subscriptions.empty();
}
}));
publishNext(observer, 25, Observable.create(new Observable.OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 10, "2-one");
publishNext(observer, 20, "2-two");
publishNext(observer, 30, "2-three");
publishCompleted(observer, 40);
return Subscriptions.empty();
}
}));
publishCompleted(observer, 30);
return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
sampled.subscribe(observer);

scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("1-one");
inOrder.verify(observer, times(1)).onNext("1-two");
inOrder.verify(observer, times(1)).onNext("2-one");
inOrder.verify(observer, times(1)).onNext("2-two");
inOrder.verify(observer, times(1)).onNext("2-three");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit 68d7600

Please sign in to comment.