diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index 9094a2affd..634801f19b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -20,7 +20,7 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; +import rx.subscriptions.SerialSubscription; import rx.util.functions.Func1; /** @@ -62,8 +62,7 @@ public Subscription onSubscribe(Observer observer) { SafeObservableSubscription parent; parent = new SafeObservableSubscription(); - MultipleAssignmentSubscription child; - child = new MultipleAssignmentSubscription(); + SerialSubscription child = new SerialSubscription(); parent.wrap(sequences.subscribe(new SwitchObserver(observer, parent, child))); @@ -76,13 +75,13 @@ private static class SwitchObserver implements Observer observer; private final SafeObservableSubscription parent; - private final MultipleAssignmentSubscription child; + private final SerialSubscription child; private long latest; private boolean stopped; private boolean hasLatest; public SwitchObserver(Observer observer, SafeObservableSubscription parent, - MultipleAssignmentSubscription child) { + SerialSubscription child) { this.observer = observer; this.parent = parent; this.child = child; @@ -97,8 +96,7 @@ public void onNext(Observable args) { this.hasLatest = true; } - final SafeObservableSubscription sub; - sub = new SafeObservableSubscription(); + final SafeObservableSubscription sub = new SafeObservableSubscription(); sub.wrap(args.subscribe(new Observer() { @Override public void onNext(T args) { @@ -111,28 +109,35 @@ public void onNext(T args) { @Override public void onError(Throwable e) { + sub.unsubscribe(); + SafeObservableSubscription s = null; synchronized (gate) { - sub.unsubscribe(); if (latest == id) { SwitchObserver.this.observer.onError(e); - SwitchObserver.this.parent.unsubscribe(); + s = SwitchObserver.this.parent; } } + if(s != null) { + s.unsubscribe(); + } } @Override public void onCompleted() { + sub.unsubscribe(); + SafeObservableSubscription s = null; synchronized (gate) { - sub.unsubscribe(); if (latest == id) { SwitchObserver.this.hasLatest = false; - } - if (stopped) { - SwitchObserver.this.observer.onCompleted(); - SwitchObserver.this.parent.unsubscribe(); + if (stopped) { + SwitchObserver.this.observer.onCompleted(); + s = SwitchObserver.this.parent; + } } - + } + if(s != null) { + s.unsubscribe(); } } @@ -152,13 +157,17 @@ public void onError(Throwable e) { @Override public void onCompleted() { + SafeObservableSubscription s = null; synchronized (gate) { this.stopped = true; if (!this.hasLatest) { this.observer.onCompleted(); - this.parent.unsubscribe(); + s = this.parent; } } + if(s != null) { + s.unsubscribe(); + } } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java index 8afa5557f1..76490f1cec 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java @@ -380,4 +380,51 @@ public void call() { @SuppressWarnings("serial") private class TestException extends Throwable { } + + @Test + public void testSwitchIssue737() { + // https://github.com/Netflix/RxJava/issues/737 + Observable> source = Observable.create(new Observable.OnSubscribeFunc>() { + @Override + public Subscription onSubscribe(Observer> observer) { + publishNext(observer, 0, Observable.create(new Observable.OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 10, "1-one"); + publishNext(observer, 20, "1-two"); + // The following events will be ignored + publishNext(observer, 30, "1-three"); + publishCompleted(observer, 40); + return Subscriptions.empty(); + } + })); + publishNext(observer, 25, Observable.create(new Observable.OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 10, "2-one"); + publishNext(observer, 20, "2-two"); + publishNext(observer, 30, "2-three"); + publishCompleted(observer, 40); + return Subscriptions.empty(); + } + })); + publishCompleted(observer, 30); + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("1-one"); + inOrder.verify(observer, times(1)).onNext("1-two"); + inOrder.verify(observer, times(1)).onNext("2-one"); + inOrder.verify(observer, times(1)).onNext("2-two"); + inOrder.verify(observer, times(1)).onNext("2-three"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } }