diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 9838a8073bc..e2c9be1ebf4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -51,7 +51,12 @@ public static OnSubscribeFunc concat(final Iterable OnSubscribeFunc concat(final Observable> sequences) { - return new Concat(sequences); + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer t1) { + return new Concat(sequences).onSubscribe(t1); + } + }; } private static class Concat implements OnSubscribeFunc { @@ -121,8 +126,12 @@ public void onNext(Observable nextSequence) { @Override public void onError(Throwable e) { if (completedOrErred.compareAndSet(false, true)) { - if (innerSubscription != null) { - innerSubscription.unsubscribe(); + Subscription q; + synchronized (nextSequences) { + q = innerSubscription; + } + if (q != null) { + q.unsubscribe(); } observer.onError(e); } @@ -131,7 +140,11 @@ public void onError(Throwable e) { @Override public void onCompleted() { allSequencesReceived.set(true); - if (innerSubscription == null) { + Subscription q; + synchronized (nextSequences) { + q = innerSubscription; + } + if (q == null) { // We are not subscribed to any sequence, and none are coming anymore if (completedOrErred.compareAndSet(false, true)) { observer.onCompleted(); @@ -143,11 +156,14 @@ public void onCompleted() { return new Subscription() { @Override public void unsubscribe() { + Subscription q; synchronized (nextSequences) { - if (innerSubscription != null) - innerSubscription.unsubscribe(); - outerSubscription.unsubscribe(); + q = innerSubscription; + } + if (q != null) { + q.unsubscribe(); } + outerSubscription.unsubscribe(); } }; } diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index f42d8b81ade..0f3f8f88669 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -16,7 +16,6 @@ package rx.operators; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static rx.operators.OperationConcat.*; @@ -33,6 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.concurrency.TestScheduler; import rx.subscriptions.BooleanSubscription; public class OperationConcatTest { @@ -556,4 +556,46 @@ public void run() { return s; } } + @Test + public void testMultipleObservers() { + Observer o1 = mock(Observer.class); + Observer o2 = mock(Observer.class); + + TestScheduler s = new TestScheduler(); + + Observable timer = Observable.interval(500, TimeUnit.MILLISECONDS, s).take(2); + Observable o = Observable.concat(timer, timer); + + o.subscribe(o1); + o.subscribe(o2); + + InOrder inOrder1 = inOrder(o1); + InOrder inOrder2 = inOrder(o2); + + s.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + inOrder1.verify(o1, times(1)).onNext(0L); + inOrder2.verify(o2, times(1)).onNext(0L); + + s.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + inOrder1.verify(o1, times(1)).onNext(1L); + inOrder2.verify(o2, times(1)).onNext(1L); + + s.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + inOrder1.verify(o1, times(1)).onNext(0L); + inOrder2.verify(o2, times(1)).onNext(0L); + + s.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + inOrder1.verify(o1, times(1)).onNext(1L); + inOrder2.verify(o2, times(1)).onNext(1L); + + inOrder1.verify(o1, times(1)).onCompleted(); + inOrder2.verify(o2, times(1)).onCompleted(); + + verify(o1, never()).onError(any(Throwable.class)); + verify(o2, never()).onError(any(Throwable.class)); + } }