Skip to content

Commit

Permalink
Merge pull request ReactiveX#346 from benjchristensen/BlockingObservable
Browse files Browse the repository at this point in the history
BlockingObservable Refactor
  • Loading branch information
benjchristensen committed Sep 5, 2013
2 parents 00b199a + 746f762 commit 2d5d74e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@
(.onNext o 99)
(.onCompleted o)
(rx.subscriptions.Subscriptions/empty)))
(BlockingObservable/single)))))
.toBlockingObservable
.single))))

(testing "can pass rx/fn to map and friends"
(is (= (+ 1 4 9)
(-> (Observable/from [1 2 3])
(.map (rx/fn [v] (* v v)))
(.reduce (rx/fn* +))
(BlockingObservable/single)))))
.toBlockingObservable
.single))))

(testing "can pass rx/action to subscribe and friends"
(let [finally-called (atom nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,12 @@ def class ObservableTests {
return Observable.from(1, 3, 2, 5, 4);
}

public TestObservable getObservable() {
return new TestObservable(counter++);
public TestOnSubscribe getOnSubscribe() {
return new TestOnSubscribe(counter++);
}

public Observable getObservable() {
return Observable.create(getOnSubscribe());
}
}

Expand All @@ -335,14 +339,14 @@ def class ObservableTests {
public void received(Object o);
}

def class TestObservable extends Observable<String> {
def class TestOnSubscribe implements OnSubscribeFunc<String> {
private final int count;

public TestObservable(int count) {
public TestOnSubscribe(int count) {
this.count = count;
}

public Subscription subscribe(Observer<String> observer) {
public Subscription onSubscribe(Observer<String> observer) {

observer.onNext("hello_" + count);
observer.onCompleted();
Expand Down
Loading

0 comments on commit 2d5d74e

Please sign in to comment.