Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx.Single #3012

Merged
merged 1 commit into from
Jun 10, 2015
Merged

rx.Single #3012

merged 1 commit into from
Jun 10, 2015

Conversation

benjchristensen
Copy link
Member

Adds rx.Single as an "Observable Future" for representing work with a single return value.

See #1594 rx.Future/Task/Async/Single

This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Singles can be combined into an Observable stream. Note how Single.zip returns Single<R> whereas Single.merge returns Observable<R>.

Examples of using this class:

import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}

@akarnokd
Copy link
Member

akarnokd commented Jun 9, 2015

I don't see any way to cancel a subscription to a Single.

@benjchristensen
Copy link
Member Author

When using subscribe(Subscriber s) then the Subscriber can be unsubscribed, just like an Observable. This is how timeout works to cancel it.

The SingleObserver doesn't expose this, similar to Observer.

We don't currently have something like take or takeUntil, as I felt it was odd to have something called take when only a single response can be emitted. I think takeUntil(Single/Observable) is a valid operator for a Single though.

The subscribe methods have a void return type as per the intentions for v2 instead of returning Subscription like Observable.subscribe does. We could make them return Subscription though.

What do you think should be done?

@akarnokd
Copy link
Member

akarnokd commented Jun 9, 2015

The SingleObserver doesn't expose this, similar to Observer.

but Observable.subscribe() returns a Subscription for them.

This throws NPE:

Single.create(s -> {
    new Thread(() -> {
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // deliver value
            s.onSuccess("Data=" + arg);
    }).start();
}).timeout(1, TimeUnit.SECONDS).subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);

Exception in thread "main" java.lang.NullPointerException
    at rx.Single.toObservable(Single.java:232)
    at rx.Single.timeout(Single.java:1817)
    at rx.Single.timeout(Single.java:1740)
    at SingleTest.main(SingleTest.java:36)

If I use the other overload of timeout, it quits only after 5 seconds, instead of 2:

public static void main(String[] args) throws Exception {
    Single.create(s -> {
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // deliver value
             s.onSuccess("Data");
        }).start();
    }).timeout(1, TimeUnit.SECONDS, Single.just("Alt"))
    .subscribe(System.out::println, Throwable::printStackTrace);
    Thread.sleep(2000);
}

@benjchristensen
Copy link
Member Author

but Observable.subscribe() returns a Subscription for them.

See what I said about that in the previous comment. In 2.x we intend on not returning Subscription so I chose not to do that here. Instead unsubscription should happen from within the stream, such as via a takeUntil operator.

@benjchristensen
Copy link
Member Author

This throws NPE

Yup, it certainly did :-) Unit test added. I had a null not being handled.

@benjchristensen
Copy link
Member Author

These tests are passing and not taking 5 seconds:

    @Test
    public void testTimeout() {
        TestSubscriber<String> ts = new TestSubscriber<String>();
        Single<String> s = Single.create(new OnSubscribe<String>() {

            @Override
            public void call(SingleObserver<? super String> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // ignore as we expect this for the test
                }
                s.onSuccess("success");
            }

        }).subscribeOn(Schedulers.io());

        s.timeout(100, TimeUnit.MILLISECONDS).subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertError(TimeoutException.class);
    }

    @Test
    public void testTimeoutWithFallback() {
        TestSubscriber<String> ts = new TestSubscriber<String>();
        Single<String> s = Single.create(new OnSubscribe<String>() {

            @Override
            public void call(SingleObserver<? super String> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // ignore as we expect this for the test
                }
                s.onSuccess("success");
            }

        }).subscribeOn(Schedulers.io());

        s.timeout(100, TimeUnit.MILLISECONDS, Single.just("hello")).subscribe(ts);

        ts.awaitTerminalEvent();
        ts.assertNoErrors();
        ts.assertValue("hello");
    }

screen shot 2015-06-09 at 4 20 40 pm

@benjchristensen
Copy link
Member Author

With code in its current shape, intended for experimentation, and avoiding any controversial public APIs, shall we proceed with merging it?

@headinthebox
Copy link
Contributor

Ship that puppy.

@davidmoten
Copy link
Collaborator

No backpressure?

@akarnokd
Copy link
Member

subscribeOn(Schedulers.io())

Saved by the interrupt. But what if Single isn't subscribed to on one of the standard Schedulers, or it is subscribed to from a scheduler which doesn't call Future.cancel() with true?

Besides, the programming advantage of Subscriber is that one can add resources to it which will get unsubscribed automatically.

So instead of SingleObserver, I suggest the following interface:

interface SingleSubscriber<T> {
    void onSubscribe(Subscription s);
    void onSuccess(T value);
    void onError(Throwable ex);
}

Single.create(s -> {
    Thread t = new Thread(() -> {
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // deliver value
        s.onSuccess("Data");
    });
    s.onSubscribe(Subscription.create(() -> t.interrupt());
    t.start();
})

Single.create(s -> {
    SubscriptionList slist = new SubscriptionList();
    s.onSubscribe(slist);
    try (InputStream in = new FileInputStream("file.dat")) {
        slist.add(Subscriptions.create(() -> Closeables.closeSilently(io)));
        byte[] data = new byte[in.available()];
        in.read(data);
        s.onSuccess(data);
    } catch (IOException ex) {
       if (!slist.isUnsubscribed()) {
           s.onError(ex);
       }
    }
});

private static <T> Observable<T> toObservable(Single<T> t) {
// is this sufficient, or do I need to keep the outer Single and subscribe to it?
return Observable.create(t.onSubscribe);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this a public instance method? I'd like to write someSingle.toObservable().
And symmetrically, there could be a toSingle() method in Observable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because I'm being cautious in what I add to the public API. We can always add, but we can't take things away.

This static method exists only because I'm using it to implement the various operators.

I'm open to adding an instance toObservable() method.

@benjchristensen
Copy link
Member Author

I suggest the following interface

Interesting. I'm not thrilled though by the implicit contract this creates unless invoking onSubscribe is completely optional. But if it's optional it would also be awkward.

If we're going this route we should just skip the SingleObserver and do a SingleSubscriber just like the normal Subscriber and always have it injected in the top as a cancellation token so a Single can register unsubscribe behavior with it. That way there isn't an awkward negotiation requirement.

I'll make changes to accommodate this and ask for your feedback.

@benjchristensen
Copy link
Member Author

No backpressure?

For a single valued emission, no I don't think it should care. It significantly complicates things and I think it's silly to worry about backpressure and flow control on a type that emits a single notification, either onError or onSuccess. Don't subscribe to it if a single emission can't be handled.

When it gets composed however, then the backpressure semantics of Observable would kick in, such as with merge when many Singles are merged together.

@akarnokd
Copy link
Member

No backpressure?

I guess @davidmoten meant where the Single and Observable meet, for example in toObservable() where you should use SingleDelayedProducer to introduce the correct backpessure interop.

@akarnokd
Copy link
Member

I'm not thrilled though by the implicit contract this creates unless invoking onSubscribe is completely optional. But if it's optional it would also be awkward.

It follows the reactive-streams concept. Without it, the cancellation wouldn't compose through and you'd rely only upon the ability of subscribeOn to interrupt an Rx thread. I'd totally go with the SingleObserver/SingleSubscriber pair if subscribe(SingleObserver) returned Subscription at least (and SingleSubscriber could extend Subscription as well).

@benjchristensen
Copy link
Member Author

It follows the reactive-streams concept.

I know, but I don't like mixing patterns in v1. It would be odd to have Observable handle cancellation one way and Single do it another way.

Take a look at the code now. It has a SingleSubscriber instead of SingleObserver and composes cancellation that same way as Observable since SingleSubscriber implements Subscription.

@benjchristensen
Copy link
Member Author

where you should use SingleDelayedProducer to introduce the correct backpessure interop.

Ah okay, I can buy into that ... so like this:

    protected Single(final OnSubscribe<T> f) {
        // bridge between OnSubscribe (which all Operators and Observables use) and OnExecute (for Single)
        this.onSubscribe = new Observable.OnSubscribe<T>() {

            @Override
            public void call(final Subscriber<? super T> child) {
                final SingleDelayedProducer<T> producer = new SingleDelayedProducer<T>(child);
                child.setProducer(producer);
                SingleSubscriber<T> ss = new SingleSubscriber<T>() {

                    @Override
                    public void onSuccess(T value) {
                        producer.setValue(value);
                    }

                    @Override
                    public void onError(Throwable error) {
                        child.onError(error);
                    }

                };
                child.add(ss);
                f.call(ss);
            }

        };
    }

It still feels like a lot of overhead for something that should never need backpressure :-/ If this shows poor performance I would drop this and just leave it to things like merge to always handle receiving 1 value per Single.

@benjchristensen
Copy link
Member Author

See test for backpressure here: 7876b0c#diff-6e59701fc9262bf5107abc5d12d7a928R375

for unsubscribe: 7876b0c#diff-6e59701fc9262bf5107abc5d12d7a928R252

@akarnokd
Copy link
Member

It still feels like a lot of overhead for something that should never need backpressure

I thought its easier to change toObservable and introduce the backpressure logic there instead of at the very root.

If this shows poor performance I would drop this and just leave it

I haven't benchmarked this yet, but I believe Single already maximizes the overhead around delivering values. Based on the latest merge() benchmark where just(0).flatMap(v -> just(0)) delivers ~6M values, a stream of 1000 values, in total, delivers 45/76M values per second.

@benjchristensen
Copy link
Member Author

maximizes the overhead around delivering values

Comparing with a stream of 1000 is not a valid comparison. A Future or at most an Observable of a single value is a valid comparison.

The reason a Single would be used is because the data source is not represented by a stream of 1000 values.

Note that Single does not yet have any optimizations done. For example, we could use similar approaches as ScalarSynchronousObservable for Single.just.

I thought its easier to change toObservable

But then the test cases I provided wouldn't work where a Subscriber is used with request(0). So it depends on what you're trying to solve for.

Look at what toObservable is doing and you'll notice that a Single is just a different type around the same Observable.OnSubscribe internally. This is done on purpose to allow reusing Observable operators.

If we agree with the public API of SingleSubscriber then the rest of this design is internal implementation details (I think) so we can move forward and always change it later.

Looking at the code as it currently stands, is there anything more you want changed before proceeding with it?

@akarnokd
Copy link
Member

Comparing with a stream of 1000 is not a valid comparison.

So I guess delivering 1000 items through AbstractOnSubscribe one at a time isn't really comparable with AbstractProducer emitting the same amount when requested all at once.

Regardless, If the use case for Single will be to create 1000s of them to deliver a single value each, then you'd better off restructuring the business logic around an (I hoped I could avoid using the e-word) event-bus.

The reason a Single would be used is because the data source is not represented by a stream of 1000 values.

It really depends on how long the computation takes to become available: if it is in microsecond or millisecond range, then the few dozen nanoseconds CAS takes inside the SingleDelayedProducer.setValue doesn't really matter.

Looking at the code as it currently stands, is there anything more you want changed before proceeding with it?

So I'm guessing there won't be any unsubscription possibility if one uses the non- Subscriber or SingleSubscriber-based subscribe() methods?

Note that Single does not yet have any optimizations done.

Perhaps you could include benchmarks already so the improvements can be measured.

@benjchristensen
Copy link
Member Author

then you'd better off restructuring the business logic

Sure, but that's why we have different abstractions, types and tools.

It really depends on how long the computation takes to become available

The point of Single is to represent a single async unit of work, typically IO. If it is microseconds it is most certain being done "the wrong way", similar to putting fine grained work on an Executor and receiving a Future.

So I'm guessing there won't be any unsubscription possibility if one uses the non- Subscriber or SingleSubscriber-based subscribe() methods?

I guess we can make those return Subscription since that's how Observable works. I'll change that before merging.

Adds `rx.Single` as an "Observable Future" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task/Async/Single

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.

Examples of using this class:

```java
import rx.Observable;
import rx.Single;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Single<String> t1 = Single.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Single<Integer> t2 = Single.just(1);

        // synchronous error
        Single<String> error = Single.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Singles for request/response like a Future
        getData(1).subscribe(System.out::println);

        // combining Tasks into another Task
        Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Singles into an Observable stream
        Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Single.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Single<String> getData(int arg) {
        return Single.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

}
```
@benjchristensen
Copy link
Member Author

Perf, as expected, is poor for representing microsecond scale computations, but totally fine for IO related request/response which this is modeling:

Benchmark                                               Mode   Samples        Score  Score error    Units
r.SinglePerfBaseline.newSingleAndSubscriberEachTime    thrpt         5  8303803.489   779599.284    ops/s
r.SinglePerfBaseline.singleConsumption                 thrpt         5  8409729.682   396871.179    ops/s
r.SinglePerfBaseline.singleConsumptionUnsafe           thrpt         5 15629012.609  1838899.288    ops/s

Also note that we are maintaining the same performance hit of SafeSubscriber and punting on solving that until 2.x as it is so baked into how Subscriber and Observable work.

@benjchristensen
Copy link
Member Author

Proceeding with merge. It is all marked as @Experimental and I am not releasing 1.0.13 yet.

benjchristensen added a commit that referenced this pull request Jun 10, 2015
@benjchristensen benjchristensen merged commit 32179c6 into ReactiveX:1.x Jun 10, 2015
@benjchristensen benjchristensen deleted the single-type branch June 10, 2015 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants