Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx.Future/Task/Async/Single #1594

Closed
benjchristensen opened this issue Aug 15, 2014 · 31 comments
Closed

rx.Future/Task/Async/Single #1594

benjchristensen opened this issue Aug 15, 2014 · 31 comments
Assignees

Comments

@benjchristensen
Copy link
Member

Explore whether an rx.Future makes sense to allow better representation of Scalar responses.


Comment below explaining reasoning: #1594 (comment)

@Matthias247
Copy link

I really would love to see this because of 2 reasons:

  • There is no good (async & composable) Future type in the Java standard library if you have to target < Java8. Even with CompleteableFuture available this uses executors for the continuations. For an RxJava integration using an Rx Scheduler is a better match - we can then receive Observable events and Future continuations on the same scheduler.
  • The alternative is using an AsyncSubject. The problem with AsyncSubject is that the user only sees an Observable in the API and thereby won't know that it emits only a single item. He also won't know if it's hot (the function call which returns the Observable will immediatly start an operation without the need for Subscribe() or not (needs Subscribe()). Therefore I think the ability to model such single return-value async operations with Futures makes an API better understandable.

@headinthebox
Copy link
Contributor

https://github.com/BoltsFramework/Bolts-Java is the java version of .net's task.
Rx.net has seamless integration between observable and task.

However without async await syntax, tasks/futures do not provide much advantage. I don't ever expect async await to be added to java BTW.

@Nilzor
Copy link

Nilzor commented Nov 24, 2014

Rx.net has seamless integration between observable and task.

Yes, but RxJava hasn't. An integration/interfacing with Bolts could be an option instead of separate implementation, but it would probably require som changes to Bolts as well. Bolts, unlike RxJava, rely on Executors.

However without async await syntax, tasks/futures do not provide much advantage

Over what? Observables? Simpler syntax and easier learning curve would be one. Right now I'm using Observables for Scalar data (HTTP responses) because that's the only thing Retrofit supports, but it's been a challenge to sift through the wiki finding just the methods I need for these kind of tasks. continueWith=flatMap? Not intuitive. I simply feel I'm using the wrong tool for the job.

@benjchristensen
Copy link
Member Author

it's been a challenge to sift through the wiki finding just the methods I need for these kind of tasks

The flatMap operator would almost certainly exist on the rx.Future. For example, see the Twitter Future that has flatMap: https://twitter.github.io/util/docs/index.html#com.twitter.util.Future

What makes the use of flatMap not intuitive on an Observable that has a single value versus multiple values?

@Nilzor
Copy link

Nilzor commented Nov 24, 2014

What makes the use of flatMap not intuitive on an Observable that has a single value versus multiple values?

First and foremost the naming. "FlatMap" doesn't tell me anything. What's "unflat" about a Scalar value? What part of the operation involves mapping of any kind? Promise.then is much more descriptive to me.

But I realize it might be my background coming from Tasks in C# to Promises in Javascript and then Observables that might be "polluting my mind". I also might have misunderstood this isse. Maybe you could elaborate, since you're the OP?

@headinthebox
Copy link
Contributor

flatMap in RxJava is selectMany in C#. Since you mention you know Tasks form C# you should read this http://blogs.msdn.com/b/pfxteam/archive/2013/04/03/tasks-monads-and-linq.aspx. Technically speaking the Task API is a so-called co-monad, but since they are continuations, Task can also be formulated as a monad. In Rx .NET we had manySelect for a while that corresponds to continueWith.

@benjchristensen
Copy link
Member Author

What's "unflat" about a Scalar value?

An async continuation that returns another Future needs to be flattened:

aFuture.flatMap(value -> {
   return doWithThatReturnsAnotherFuture(value);
})

In that case the flat part is flattening the returned Future, otherwise you'd end up with a Future<Future<T>> rather than a Future<T>.

What part of the operation involves mapping of any kind

The function can map from T to R however you wish.

Future<R> Future.flatMap(Func1<T, Future<R>> mapper)

If there is no need to flatten then it would be map:

Future<R> Future.map(Func1<T, R> mapper)

If it is purely a side-effect, then the naming is up in the air but in RxJava thus far it is doOn* method names to signal the side-effects, so something like doOnSuccess/doOnError on a scalar Future and just pass the value through:

Future<T> Future.doOnSuccess(Action1<T> sideEffect)
Future<T> Future.doOnError(Action1<Throwable> sideEffect)

@Nilzor
Copy link

Nilzor commented Nov 25, 2014

Ok thanks for taking the time to answer my questions, it made things clearer.

@Matthias247
Copy link

I think the issue here is that flatmap is a description what happens in general whereas then or continueWith describe it in a more concrete way for the Future type.
Some time ago I had no experience with FP programming, so flatmap didn't tell me anything. However continueWith is more self-describing so I could understand it and it's easier to discover in an API documentation. I like it. But as I later learned that then/continueWith are equivalent to flatmap so now I understand it too. Maybe the same applies to other users?
Why not expose both function signatures? Don't think it would be a problem and I think it would make the familarity higher for OO as well as FP programmers.

However I thought the more general question was whether RxJava would get a Task/Future like structure at all? I would still like to see it. Bolts looks good, but having a single library for all kinds of asynchronous things and with the possibility for interaction between structures (futures to observables and the other way around, sharing of schedulers, ...) would be even nicer.

@benjchristensen
Copy link
Member Author

However I thought the more general question was whether RxJava would get a Task/Future like structure at all?

Yes, that's the more general question. I tend to think we should.

having a single library for all kinds of asynchronous things and with the possibility for interaction between structures ... would be even nicer.

Agreed, which is why I want it part of RxJava. Most everything would be shared, mostly just the type signature would differ.

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

Can someone explain what this Future/Task should do, or more specifically, what is the underlying issue we want to solve that Observable or Async can't do (efficiency?)?

@benjchristensen
Copy link
Member Author

Represent request/response of a single value. It is purely a design thing, not functional. Many, many people would prefer to represent scalar values such as IO request/response (think HTTP, REST, RPC) as a Future/Task but be able to merge them into Observable streams. It is one of the most common questions I get.

Rx.Net has it because .Net already has Task that works nicely with Observable since they were built for each other. The Java Future is unusable in this sense and CompletableFuture in Java 8 can be made to work but it's entire API design is incredibly different and they are always "hot".

@benjchristensen
Copy link
Member Author

FYI, I plan on tackling this myself as we want to use it at Netflix for something.

@benjchristensen benjchristensen self-assigned this Feb 6, 2015
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Feb 8, 2015
Adds `rx.Task` as a "scalar Observable" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Task`s can be combined into an `Observable` stream. Note how `Task.zip` returns `Task<R>` whereas `Task.merge` returns `Observable<R>`.

NOTE: This is for experimentation and feedback at this time.

Items requiring review and work that I'm particularly aware of:

- naming of `OnExecute`
- naming of `TaskObserver` (this one in particular I don't like)
- design and implementation of `Task.Promise`
- should the public `lift` use the `Observable.Operator` or should that only be for internal reuse?
- should we have a public `lift` that uses a `Task.Operator`?
- the `Task.toObservable` implementation right now is efficient but will likely break something so it likely needs to change to use `subscribe`
- implementation of this merge variant: `Task<T> merge(final Task<? extends Task<? extends T>> source)`
- several operators currently just wrap as `Observable` to reuse existing operators ... is that okay performance wise?
- Javadocs

Examples of using this class:

```java
import rx.Observable;
import rx.Task;
import rx.Task.Promise;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Task<String> t1 = Task.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Task<Integer> t2 = Task.just(1);

        // synchronous error
        Task<String> error = Task.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Tasks for request/response like a Future
        getData(1).subscribe(System.out::println);
        getDataUsingPromise(2).subscribe(System.out::println);

        // combining Tasks into another Task
        Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Tasks into an Observable stream
        Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Task.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Task<String> getData(int arg) {
        return Task.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

    /**
     * Example of an async scalar execution using a Task.Promise
     * <p>
     * This shows how an eager (hot) process would work like using a Future.
     *
     * @param arg
     * @return
     */
    public static Task<String> getDataUsingPromise(int arg) {
        Task.Promise<String> p = Promise.create();

        new Thread(() -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // deliver value
                p.onSuccess("Data=" + arg);
            }).start();

        return p.getTask();
    }
}
```
@mttkay
Copy link
Contributor

mttkay commented Feb 9, 2015

I would also be curious to see what value this adds other than introducing a concept that developers familiar with other frameworks or languages are used to. So just to play devil's advocate: if Observable is semantically a superset of Task as it's capable of producing N results rather than just 1 (and N can be 1), does this justify introducing a new type? I am worried that this really only adds cognitive overhead and redundancy, as we're essentially dealing with the same thing, but one is a specialization of the other? Will people start asking when to use a Task and when to use an Observable?

I should add that we use almost always scalar Observables in our app, and I never thought that it feels awkward. An Observable.just(list) seems no more or less confusing to me than a Future.value(list) (to draw the comparison again to Twitter's composable Futures.)

That said, is the actual problem we're trying to solve one of naming and expectation, because people are looking at Observable and expecting a Scalar, because that's what Futures do? So is the problem more discoverability of the API rather than a gap in functionality?

I do need to say here that if I had to pick one design aspect of RxJava that I think really stands out, then it's its universal applicability, and that's partially because of a healthy lack of assumptions--emitting a single result is an assumption--and I find this to be a powerful combination in library design.

@mttkay
Copy link
Contributor

mttkay commented Feb 9, 2015

I think I meant to respond on the PR, as the discussion seemed to have moved there. Sorry.

@ylemoigne
Copy link

I think it's confusing, because the http://reactivex.io/intro.html first array tells that for single item, the concept to use is a Future.

@JakeWharton
Copy link
Contributor

That is what this would be aiming to replace. Future is a really bad abstraction.

@markterm
Copy link

Would obs.toSeq, obs.single return Task? Certainly I'd like to see that but we'd need to make sure that doesn't interfere with people who don't want to use Task ...

@sdeleuze
Copy link

sdeleuze commented Apr 9, 2015

As said in the PR comments, I fully agree with the need to have a scalar observable type, especially when exposing such type in application public API.

The main thing that confuses me in the implementation proposal is the embedded Task.Promise type. I understand this is done differently in .NET since Task already exists, but on other platforms I would just expect a single type that I would expose in my API instead of Observable for scalar result.

@benjchristensen
Copy link
Member Author

I'm still interested in pursuing this. The only thing holding us up right now is a final decision on what we should call it.

It seems Single is the name we are converging on: #2641 (comment)

Any reason not to merge an @Experimental version of rx.Single?

@benjchristensen benjchristensen changed the title rx.Future/Task rx.Future/Task/Async/Single May 11, 2015
@headinthebox
Copy link
Contributor

Let's go for it!

@akarnokd
Copy link
Member

Single sounds odd to me. Here are my candidates: Scalar(Task, Observable, Future), Single(Task, Future), Async(Task, Scalar, Future), ObservableScalar; however, I suspect most combinations are already taken by some other library.

Edit: Just.

@headinthebox
Copy link
Contributor

http://en.wikipedia.org/wiki/Scalar: "Variable (computing), or scalar, an atomic quantity that can hold only one value at a time". Why use a complex term like "scalar" when you just mean "single"?

@benjchristensen
Copy link
Member Author

@akarnokd I don't quite understand what you are proposing with the brackets: "Scalar(Task, Observable, Future)". What is meant by the (Task, Observable, Future) after the "Scalar"?

I agree with @headinthebox on this that Scalar is not what we want here. Others agree: #2641 (comment) and #2641 (comment)

I also don't really like the idea of having a long name like SingleObservable. It can't be Async because the type can be either synchronous or asynchronous, just like an Observable. We can't use Future as that collides too much. We could use Task, though it collides with some other Task definitions, and with the C# usage which behaves differently. Also, semantically Task doesn't seem right as a "task" is typically intended to mean a "one-shot" execution whereas this type could be invoked multiple times.

This brings us back to Single again.

@akarnokd How about let's proceed with Single and let us all start using the code in the @Experimental state and over the next months of usage see if it feels right as an abstraction and name? Over the coming months we'll mature towards a final status in v1.1 or v1.2 and have a chance before then to change if we come to a different agreement?

@akarnokd
Copy link
Member

@benjchristensen I meant potential postixes, i.e., ScalarTask, ScalarObservable or ScalarFuture, etc.

Since we will use the experimental annotation, let's proceed with Single and come back to the name in a few months once enough use experience has been accumulated.

@schrepfler
Copy link

I like Single but just to throw more alternatives, Observable1 or OneObservable can be considered to avoid mixing up with other class names/libraries.

@benjchristensen
Copy link
Member Author

Here are examples of using Single:

import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Single;
import rx.Single.Promise;
import rx.schedulers.Schedulers;

public class SingleExperiment {

    public static void main(String... args) {
        Observable<String> a_merge_b = getDataA().mergeWith(getDataB());

        Single<String> a_and_b = getDataA().flatMap(t -> {
            return getDataB().map(r -> t + r);
        });

        Single<String> a_transformed = getDataA().map(a -> a + "_response");

        Observable<String> a_concat_b = getDataA().concatWith(getDataB());

        /**
         * This shows flatMapping a Single with an Observable
         */
        Observable<String> a_with_c = getDataA().flatMapObservable(a -> {
            return getDataC().map(c -> a + "-" + c);
        });

        Single<String> aWithTimeout = getDataA().timeout(100, TimeUnit.MILLISECONDS);

        Single<String> aWithTimeoutAndSimpleDefaultValue = aWithTimeout.onErrorReturn(t -> "DefaultValue");

        /**
         * We can use toObservable() for a Single to interact with an Observable.
         * Or we can add the various overloads to Observable.merge/concat/zip/mergeWith/concatWith/zipWith/etc
         * Adding the overloads though does result in a lot of API clutter.
         */
        getDataC().mergeWith(getDataA().toObservable());

    }

    public static Single<String> getDataA() {
        return Single.<String> create(o -> {
            o.onSuccess("DataA");
        }).subscribeOn(Schedulers.io());
    }

    public static Single<String> getDataB() {
        return Single.just("DataB")
                .subscribeOn(Schedulers.io());
    }

    public static Observable<String> getDataC() {
        return Observable.interval(10, TimeUnit.MILLISECONDS).map(i -> "c" + i);
    }

    /**
     * Example of Single.Promise
     */
    public static Single<String> getDataD() {
        Promise<String> p = Single.Promise.create();
        new Thread(() -> {
            p.onSuccess("data-d-from-thread");
        }).start();
        return p.getSingle();
    }
}

@sdeleuze
Copy link

@benjchristensen I like this new proposal but could you explain why there is a need for this embedded Promise type?

@benjchristensen
Copy link
Member Author

Reposting a modified version of #2641 (comment) to provide reasoning for this proposal:


The evolution of thinking after using RxJava for a couple years is that libraries that expose public APIs can benefit from being able to communicate if something is scalar or vector – despite the "functional" argument that a scalar is just a "list of one". We are able to choose to use T or List<T> when creating a synchronous API, so why not for async as well? In other words, why should the use of RxJava exclude communicating when we do know something is a scalar, which is a very common use case, and results in a simpler mental model?

When composition occurs that results in multiple values then the type changes to an Observable. Not all composition and transformation requires this though – zip, amb, map, flatMap all can return Single. Others like merge will return an Observable.

Another consideration is that APIs that expose data access are generally separate from the consuming code that composes them, and it is these data access APIs where the scalar Single is most effective at providing communicating via a static type such as:

Single<T> getData()

versus

Observable<T> getData()

does this actually simplify anything?

There is one thing I think it simplifies that is more than just making it easier due to familiarity. It communicates when a user does NOT need to think about multiple values, and that does indeed simplify code.

A Single will always behave in one of 3 ways:

  1. respond with an error
  2. never respond
  3. respond with a success

An Observable on the other hand may:

  1. response with an error
  2. never respond
  3. respond successfully with no data and terminate
  4. respond successfully with a single value and terminate
  5. respond successfully with multiple values and terminate
  6. respond successfully with one or more values and never terminate (waiting for more data)

The mental model for these extra possibilities do definitely increase the complexity and are a big part of the pushback for using RxJava that I have seen for request/response models.

These more complex models do still end up happening as soon as composition occurs, but now that composition is declared by the consumer, and the consumer knows the contract of the producer to be simpler if it is a Single.

For example, in "service layer" type code we could have getters like this:

Single<T> getDataA(arg);
Single<R> getDataB(arg);
Observable<R> getDataC(arg);

This is now simpler to understand because the types narrow the scope for A and B so we know we don't need to consider multi-value or infinite cases (which generally need to be explained in the javadoc). The C type in contrast clearly indicates that it wants to support multiple values otherwise it would have just been a Single.

familiarity can always be learned, but complexity is harder to avoid

My thinking has evolved to think that trying to force everything to be a vector results in increased complexity of a different kind that can be addressed by a slight increase in complexity of the RxJava APIs (2 types instead of 1). Static types communicate common information that simplifies the mental model and constrains available states and operations that can be applied. This seems worth the increase in API surface area.

@benjchristensen
Copy link
Member Author

could you explain why there is a need for this embedded Promise type

@sdeleuze it is not required and I should probably remove it. I added it to prove a common programming model.

Below are two ways of achieving the same thing. The first eliminates nesting but does involve an extra type. However, Promises are "hot" whereas the Single.create approach supports hot and cold execution.

    public static Single<String> getDataD() {
        Promise<String> p = Single.Promise.create();
        doStuffWithCallback(d -> p.onSuccess(d + "-from-thread"));
        return p.getSingle();
    }

    public static Single<String> getDataE() {
        return Single.create(o -> {
            doStuffWithCallback(d -> {
                o.onSuccess(d + "-from-thread");
            });
        });
    }

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 9, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 9, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 9, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 10, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 10, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 10, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jun 10, 2015
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
@benjchristensen
Copy link
Member Author

An @Experimental version was merged in #3012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests