Skip to content

Commit

Permalink
rx.Single
Browse files Browse the repository at this point in the history
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
  • Loading branch information
benjchristensen committed Jun 9, 2015
1 parent f956293 commit 4394e76
Show file tree
Hide file tree
Showing 3 changed files with 2,145 additions and 0 deletions.
Loading

0 comments on commit 4394e76

Please sign in to comment.