Skip to content

Commit

Permalink
Fix violations of the Observer contract.
Browse files Browse the repository at this point in the history
  • Loading branch information
johngmyers committed Mar 29, 2013
1 parent b2697cb commit 550005c
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.testing.TrustedObservableTester.assertTrustedObservable;

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
Expand Down Expand Up @@ -97,12 +98,16 @@ public ItemObserver(Observer<T> observer) {

@Override
public void onCompleted() {
observer.onCompleted();
if (counter.getAndSet(num) < num) {
observer.onCompleted();
}
}

@Override
public void onError(Exception e) {
observer.onError(e);
if (counter.getAndSet(num) < num) {
observer.onError(e);
}
}

@Override
Expand All @@ -129,7 +134,7 @@ public static class UnitTest {
@Test
public void testTake1() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(take(w, 2));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 2)));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand All @@ -144,7 +149,7 @@ public void testTake1() {
@Test
public void testTake2() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(take(w, 1));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand All @@ -158,14 +163,23 @@ public void testTake2() {

@Test
public void testTakeDoesntLeakErrors() {
Observable<String> source = Observable.concat(Observable.from("one"), Observable.<String>error(new Exception("test failed")));
Observable.create(take(source, 1)).last();
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onNext("one");
observer.onError(new Exception("test failed"));
return Subscriptions.empty();
}
});
Observable.create(assertTrustedObservable(take(source, 1))).last();
}

@Test
public void testTakeZeroDoesntLeakError() {
Observable<String> source = Observable.<String>error(new Exception("test failed"));
Observable.create(take(source, 0)).lastOrDefault("ok");
Observable<String> source = Observable.error(new Exception("test failed"));
Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
}

@Test
Expand All @@ -175,7 +189,7 @@ public void testUnsubscribeAfterTake() {

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Observable<String> take = Observable.create(take(w, 1));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));
take.subscribe(aObserver);

// wait for the Observable to complete
Expand Down

0 comments on commit 550005c

Please sign in to comment.