Skip to content

Commit

Permalink
Add new unit test to cover infinite observable being the first sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
billyy committed May 15, 2013
1 parent cfa7155 commit 658d824
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import rx.Subscription;
import rx.subscriptions.BooleanSubscription;
import rx.util.AtomicObservableSubscription;
import rx.util.Exceptions;

import rx.util.functions.Func1;

public final class OperationConcat {
Expand Down Expand Up @@ -460,6 +460,38 @@ public void testConcatConcurrentWithInfinity() {

}

@Test
public void testConcatConcurrentWithInfinityFirstSequence() {
final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
//This observable will send "hello" MAX_VALUE time.
final TestObservable<String> w2 = new TestObservable<String>("hello", Integer.MAX_VALUE);

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w2, w1);
Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);

Observable<String> concat = Observable.create(concatF);

concat.take(50).subscribe(aObserver);

//Wait for the thread to start up.
try {
Thread.sleep(25);
w2.t.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(50)).onNext("hello");
verify(aObserver, times(1)).onCompleted();
verify(aObserver, never()).onError(any(Exception.class));

}


/**
* Test unsubscribing the concatenated Observable in a single thread.
Expand Down

0 comments on commit 658d824

Please sign in to comment.