Skip to content

Commit

Permalink
Operator: ForEach
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 14, 2013
1 parent 1086172 commit db76264
Showing 1 changed file with 57 additions and 0 deletions.
57 changes: 57 additions & 0 deletions src/test/groovy/rx/lang/groovy/ObservableTests.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,63 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testForEach() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testForEachWithComplete() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)}, {}, {a.received('done')});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
verify(a, times(1)).received("done");
}

@Test
public void testForEachWithError() {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)});
verify(a, times(0)).received(1);
verify(a, times(0)).received(2);
verify(a, times(0)).received(3);
verify(a, times(1)).received("err");
verify(a, times(0)).received("done");
}

@Test
public void testForEachWithCompleteAndError() {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')}, {err -> a.received(err.message)}, {a.received('done')},);
verify(a, times(0)).received(1);
verify(a, times(0)).received(2);
verify(a, times(0)).received(3);
verify(a, times(1)).received("err");
verify(a, times(0)).received("done");
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(50)
}catch(Exception e) {
// ignore
}
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
}
}

def class TestFactory {
int counter = 1;
Expand Down

0 comments on commit db76264

Please sign in to comment.