From 658d824b9f632f79a2b29979a6dfbb1a0a925783 Mon Sep 17 00:00:00 2001 From: Billy Yuen Date: Wed, 15 May 2013 11:37:43 -0700 Subject: [PATCH] Add new unit test to cover infinite observable being the first sequence. --- .../java/rx/operators/OperationConcat.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 625fb4cd3e..6f3b8902f5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -36,7 +36,7 @@ import rx.Subscription; import rx.subscriptions.BooleanSubscription; import rx.util.AtomicObservableSubscription; -import rx.util.Exceptions; + import rx.util.functions.Func1; public final class OperationConcat { @@ -460,6 +460,38 @@ public void testConcatConcurrentWithInfinity() { } + @Test + public void testConcatConcurrentWithInfinityFirstSequence() { + final TestObservable w1 = new TestObservable("one", "two", "three"); + //This observable will send "hello" MAX_VALUE time. + final TestObservable w2 = new TestObservable("hello", Integer.MAX_VALUE); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + @SuppressWarnings("unchecked") + TestObservable> observableOfObservables = new TestObservable>(w2, w1); + Func1, Subscription> concatF = concat(observableOfObservables); + + Observable concat = Observable.create(concatF); + + concat.take(50).subscribe(aObserver); + + //Wait for the thread to start up. + try { + Thread.sleep(25); + w2.t.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(50)).onNext("hello"); + verify(aObserver, times(1)).onCompleted(); + verify(aObserver, never()).onError(any(Exception.class)); + + } + /** * Test unsubscribing the concatenated Observable in a single thread.