0.17.0
Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.
The new signatures related to Observable
in this release are:
// A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
public final static <T> Observable<T> create(OnSubscribe<T> f)
// The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
// Subscriber is an Observer + Subscription
public abstract class Subscriber<T> implements Observer<T>, Subscription
// The main `subscribe` behavior receives a Subscriber instead of Observer
public final Subscription subscribe(Subscriber<? super T> subscriber)
// Subscribing with an Observer however is still appropriate
// and the Observer is automatically converted into a Subscriber
public final Subscription subscribe(Observer<? super T> observer)
// A new 'lift' function allows composing Operator implementations together
public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
// The `Operator` used with `lift`
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
Also changed is the Scheduler
interface which is much simpler:
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public final Subscription scheduleRecursive(final Action1<Recurse> action)
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
public static final class Recurse {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
}
}
This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.
As shown in the code above the changes fall into 2 major sections:
1) Lift/Operator/OnSubscribe/Subscriber
Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.
2) Schedulers
Simplification of the Scheduler
interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.
Lift/Operator/OnSubscribe/Subscriber
New types Subscriber
and OnSubscribe
along with the new lift
function have been added. The reasons and benefits are as follows:
1) Synchronous Unsubscribe
RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> Observer) {
for (int i = 1; i < 1000000; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this Observable
will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10)
.
Version 0.17.0 fixes this issue by injecting the Subscription
into the OnSubscribe
function to allow code like this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// we now receive a Subscriber instead of Observer
for (int i = 1; i < 1000000; i++) {
// the OnSubscribe can now check for isUnsubscribed
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this will now correctly only emit 10 onNext
and unsubscribe:
// subscribe with an Observer
oi.take(10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
}
})
Or the new Subscriber
type can be used and the Subscriber
itself can unsubscribe
:
// or subscribe with a Subscriber which supports unsubscribe
oi.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
if(t >= 10) {
// a Subscriber can unsubscribe
this.unsubscribe();
}
}
})
2) Custom Operator Chaining
Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable
is using static methods. This has meant code like this:
MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
In reality we want:
observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
Using the newly added lift
we can get quite close to this:
observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
Here is how the proposed lift
method looks if all operators were applied with it:
Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
Along with the lift
function comes a new Operator
signature:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
All operator implementations in the rx.operators
package will over time be migrated to this new signature.
NOTE: Operators that have not yet been migrated do not work with synchronous unsubscribe.
3) Simpler Operator Implementations
The lift
operator injects the necessary Observer
and Subscription
instances (via the new Subscriber
type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription
is available in-scope there are no awkward coding patterns needed for creating a Subscription
, closing over it and returning and taking into account synchronous vs asynchronous.
For example, the body of fromIterable
is simply:
public void call(Subscriber<? super T> o) {
for (T i : is) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
o.onCompleted();
}
The take
operator is:
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}
child.add(parent);
return new Subscriber<T>(parent) {
int count = 0;
boolean completed = false;
@Override
public void onCompleted() {
if (!completed) {
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
child.onError(e);
}
}
@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
child.onNext(i);
if (++count >= limit) {
completed = true;
child.onCompleted();
unsubscribe();
}
}
}
};
}
4) Recursion/Loop Performance with Unsubscribe
The fromIterable
use case is 20x faster when implemented as a loop instead of recursive scheduler (see a18b8c1).
Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.
Schedulers
Schedulers were greatly simplified to a design based around Action1<Inner>
.
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public final Subscription scheduleRecursive(final Action1<Recurse> action)
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
public static final class Recurse {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
}
}
This design change originated from three findings:
- It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.
To solve this the new design explicitly has the outer Scheduler
and then Scheduler.Inner
for recursion.
- The passing of state is not useful since scheduling over network boundaries with this model does not work.
In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler
that attempted to use observeOn
to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.
- The number of overloads with different ways of doing the same things were confusing.
This new design removes all but the essential and simplest methods.
- A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.
This new design applies similar principles as done with lift
/create
/OnSubscribe
/Subscriber
and injects the Subscription
via the Inner
interface so a running task can check isUnsubscribed()
.
WIth this new design, the simplest execution of a single task is:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
}
});
Recursion is easily invoked like this:
Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {
@Override
public void call(Recurse recurse) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
recurse.schedule();
}
});
or like this if the outer and inner actions need different behavior:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
inner.schedule(this);
}
});
The use of Action1<Inner>
on both the outer and inner levels makes it so recursion that refer to this
and it works easily.
Similar to the new lift
/create
pattern with Subscriber
the Inner
is also a Subscription
so it allows efficient loops with unsubscribe
support:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
An action can now unsubscribe
the Scheduler.Inner
:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
int i = doOtherWork();
if(i > 100) {
// an Action can cause the Scheduler to unsubscribe and stop
inner.unsubscribe();
}
}
}
});
Typically just stopping is sufficient:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
}
}
});
but if other work in other tasks is being done and you want to unsubscribe conditionally you could:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
} else {
inner.unsubscribe();
}
}
});
and the recursion can be delayed:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed ... but delay the recursion
inner.schedule(this, 500, TimeUnit.MILLISECONDS);
}
});
The same pattern works with the Recurse
signature:
Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {
@Override
public void call(Recurse recurse) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
recurse.schedule(500, TimeUnit.MILLISECONDS);
}
});
The methods on the Inner
never return a Subscription
because they are always a single thread/event-loop/actor/etc and controlled by the Subscription
returned by the initial Scheduler.schedule
method. This is part of clarifying the contract.
Thus an unsubscribe
controlled from the outside would be done like this:
Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
// unsubscribe from outside
s.unsubscribe();
Migration Path
1) Lift/OnSubscribe/Subscriber
The lift
function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber
type is also additive and for most use cases does not need to be used directly, the Observer
interface can continue being used.
The previous create(OnSubscribeFunc f)
signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.
Code such as this:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> o) {
o.onNext(1);
o.onCompleted();
return Subscriptions.empty();
}
});
should change to this:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
If concurrency was being injected to allow unsubscribe support:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(final Observer<? super Integer> o) {
final BooleanSubscription s = new BooleanSubscription();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (!s.isUnsubscribed()) {
o.onNext(i++);
}
}
});
t.start();
return s;
}
});
you may no longer need it and can implement like this instead:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
or if the concurreny is still desired you can simplify the Subscription
management:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
t.start();
}
});
or use subscribeOn
which now works to make synchronous Observables
async while supporting unsubscribe
(this didn't work before):
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
}).subscribeOn(Schedulers.newThread());
2) Schedulers
Custom Scheduler
implementations will need to be re-implemented and any direct use of the Scheduler
interface will also need to be updated.
3) Subscription
If you have custom Subscription
implementations you will see they now need an isUnsubscribed()
method.
You can either add this method, or wrap your function using Subscriptions.create
and it will handle the isUnsubscribed
behavior and execute your function when unsubscribe()
is called.
It is recommended to use Subscriptions.create
for most Subscription
usage.
The Future...
We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create
/subscribe
signatures as we want and the Subscription
and Scheduler
interfaces are now clean. There is at least one more major topic related to back pressure that may result in signature change in a future release. Beyond that no further major signature changing work is expected prior to 1.0.
We still need to improve on some of the Subject
implementations still, particularly ReplaySubject
. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.
As we get closer to 1.0 there will be a release that focused on deleting all deprecated methods so it is suggested to start migrating off of them.
We appreciate your usage, feedback and contributions and hope the library is creating value for you!
Pull Requests
- Pull 767 Zip fix for multiple onCompleted and moved unsubscribe outside the lock.
- Pull 770 Bind Operator
- Pull 778 Fix zip race condition
- Pull 784 Lift and Observer+Subscription
- Pull 793 Observer + Subscriber
- Pull 796 Add Subscription.isUnsubscribed()
- Pull 797 Scheduler Outer/Inner [Preview]
- Pull 805 Fix CompositeException
- Pull 785 Reimplement Zip Operator Using Lift [Preview]
- Pull 814 RunAsync method for outputting multiple values
- Pull 812 Fixed OperationSubscribeOn so OperationConditionalsTest works again.
- Pull 816 One global onCompleted object
- Pull 818 CompositeSubscription memory reduction
- Pull 817 Scala Scheduler Bindings Fix
- Pull 819 CompositeSubscription performance increase
- Pull 781 Fixed buglet in join binding, simplified types
- Pull 783 Implement some Android UI related operators
- Pull 821 Update to use Subscriber/Subscriptions.create
- Pull 826 Return wrapped Subscription
- Pull 824 Set setDaemon on NewThreadScheduler
- Pull 828 Repeat Operator
- Pull 827 Fixed cut & paster error in io scheduler
- Pull 833 Take operator was breaking the unsubscribe chain
- Pull 822 Reimplement 'subscribeOn' using 'lift'
- Pull 832 Issue #831 Fix for OperationJoin race condition
- Pull 834 Update clojure for 0.17
- Pull 839 Error Handling: OnErrorNotImplemented and java.lang.Error
- Pull 838 Make Scala OnCompleted Notification an object
- Pull 837 Perf with JMH
- Pull 841 Range OnSubscribe
- Pull 842 Test Unsubscribe
- Pull 845 Fix problem with Subscription
- Pull 847 Various Changes While Fixing GroupBy
- Pull 849 Add 'Fragment-Host' to rxjava-contrib modules for OSGi
- Pull 851 Reimplement the timeout operator and fix timeout bugs
- Pull 846 Added overloaded createRequest method that takes an HttpContext instance
- Pull 777 Fixed testSingleSourceManyIterators
- Pull 852 rxjava-debug
- Pull 853 StringObservable Update
- Pull 763 Added support for custom functions in combineLatest.
- Pull 854 The onCreate hook disappeared
- Pull 857 Change Lift to use rx.Observable.Operator
- Pull 859 Add 'Fragment-Host' to rxjava-contrib/debug module for OSGi
- Pull 860 Fixing the generics for merge and lift
- Pull 863 Optimize SwingMouseEventSource.fromRelativeMouseMotion
- Pull 862 Update the timeout docs
- Pull 790 Convert to scan to use lift
- Pull 866 Update OperationScan to OperatorScan
- Pull 870 Add the selector variants of timeout in RxScala
- Pull 874 Update CompositeSubscriptionTest.java
- Pull 869 subscribeOn + groupBy
- Pull 751 Provide Observable.timestamp(Scheduler) to be used in the tests.
- Pull 878 Scheduler.scheduleRecursive
- Pull 877 Correct synchronization guard in groupByUntil
- Pull 880 Force ViewObservable be subscribed and unsubscribed in the UI thread
- Pull 887 Remove Bad Filter Logic
- Pull 890 Split SubscribeOn into SubscribeOn/UnsubscribeOn
- Pull 891 Eliminate rx.util.* dumping grounds
- Pull 881 Lift Performance
- Pull 893 Change Parallel to use Long instead of Int
- Pull 894 Synchronized Operator Check for isTerminated
- Pull 885 Fixed an issue with the from(Reader) added a bunch of unit tests.
- Pull 896 removing java 7 dep
- Pull 883 Make Subscriptions of SwingObservable thread-safe
- Pull 895 Rewrite OperationObserveFromAndroidComponent to OperatorObserveFromAndroid
- Pull 892 onErrorFlatMap + OnErrorThrowable
- Pull 898 Handle illegal errors thrown from plugin
- Pull 901 GroupBy Unit Test from #900
- Pull 902 Fixed NullPointerException that may happen on timeout
- Pull 903 Scheduler.Recurse fields should be private
- Pull 904 Merge: Unsubscribe Completed Inner Observables
- Pull 905 RxJavaSchedulers Plugin
- Pull 909 Scheduler Plugin Refactor
- Pull 910 Remove groupBy with selector
- Pull 918 Operator: doOnTerminate
- Pull 919 BugFix: Zip Never Completes When Zero Observables
- Pull 920 Delete Deprecated onSubscribeStart That Doesn't Work
- Pull 922 Changes made while integrating it with our internal system
- Pull 924 Localized Operator Error Handling
- Pull 925 Rxjava clojure bindings final
- Pull 926 TestSubscriber: Default onError and Terminal Latch Behavior
- Pull 927 TestSubscriber lastSeenThread
- Pull 936 Skip fixed
- Pull 942 MathObservable
- Pull 944 OperationRetry -> OperatorRetry
- Pull 945 refactor the debug hooks before they become a breaking change.
- Pull 934 add Observable.startWith(Observable) method and unit test
- Pull 929 correct link to maven search
- Pull 923 Observable creation from Subscriber[T]=>Unit for Scala
- Pull 931 A number of improvements to OperatorObserveFromAndroidComponent
- Pull 950 Add support for Eclipse PDE
Artifacts: Maven Central