-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: Lift Operator (was 'bind') #746
Comments
After the first look, it definitely solves the take(n) problem as the downstream take can now access the cancellation token for the upstream (btw, loops should exit with return instead of break in case of isUnsubscribed). However, external users are still unable to cancel the long running operation as subscribe won't return the subscription until it has finished. One would need to have an overload where the cancellation token is passed in from the outside as well (or a binding which leaks the OperatorSubscription): OperatorSubscription os = new OperatorSubscription();
Schedulers.computation().schedule(() -> os.unsubscribe(), 1, TimeUnit.SECONDS);
Obsurvable.from(1).repeat().subscribe(os, new Observer<T>() { ... }); // void But it doesn't work at the moment as from(1) unsubscribes the os and repeat can't really repeat it (it prints 1 and two onCompleted). If I remove the unsubscribe and change repeat to not emit onCompleted, it works as expected and honors the RxJava contract better (see 8415495 and 8415517). After the modification it appears to me that we replace one problem with another: now one would need to carefully isUnsubscribed almost everywhere and think twice when to call unsubscribe. |
I like this approach. It keeps the user facing behaviour the same while allowing for a simpler implementation. After a first look, it seems that upstream operators shouldn't be able to unsubscribe downstream operators. So, bind would change to: public <R> Obsurvable<R> bind(final Func2<Observer<R>, OperatorSubscription, Observer<T>> bind) {
return new Obsurvable<R>(new Action2<Observer<R>, OperatorSubscription>() {
@Override
public void call(Observer<R> o, OperatorSubscription s) {
OperatorSubscription s2 = new OperatorSubscription();
s.add(s2)
f.call(bind.call(o, s), s2);
}
});
} Take can unsubscribe from all the upstream operators, but these can't unsubscribe Take (which would be notified through onComplete). |
We can write merge like so: @Override
public void onNext(Obsurvable<T> innerObsurvable) {
Subscription innerSubscription = innerObsurvable.subscribe(new Observer<T>() {
// ...
});
outerSubscription.add(innerSubscription);
} We already have to protect against onNext calls arriving after a call to onCompleted and unsubscribe, so first subscribing to the inner observable and then adding the inner subscription to the composite won't introduce any new interleaving. This seems to allow for option 2) b) while also allowing people to create arbitrary nested observables, or am I missing something? |
Each of the inner I think we may just want to provide a public final void subscribe(Observer<? super T> observer, OperatorSubscription s)
// or
public final void observe(Observer<? super T> observer, OperatorSubscription s) I think I'd prefer the different name UPDATE These signatures only work for "one shot" unless they create a new public final void observe(Observer<? super T> observer, Func0<OperatorSubscription> subscriptionFactory) |
I have yet to see actual use code where they use the The only time I'm aware of a |
How is this any different than now? We currently have to very carefully consider where to place a I also don't see it as any more sensitive about when to call On top of that, with the current implementation we have to deal with the fact that the approach doesn't work for synchronous use cases and we have to keep answering questions from people as to why their code doesn't behave as they expect and we keep telling them "use Schedulers" which in effect means Rx is only useful if all data sources is async (or small enough to not be a problem ... which kind of defeats the purpose). However, this is actually only half the answer, as in Rx.Net those synchronous cases work by putting everything on a recursive |
I need to play with this a bit to better understand and will then respond... |
Right! I forgot about that case. The overload taking a subscription factory seems like the better approach then. |
The public void onNext(Obsurvable<T> innerObsurvable) {
innerObsurvable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
synchronized (o) {
o.onCompleted();
}
}
@Override
public void onError(Throwable e) {
synchronized (o) {
o.onError(e);
}
}
@Override
public void onNext(T a) {
synchronized (o) {
o.onNext(a);
}
}
}, new Func0<OperatorSubscription>() {
@Override
public OperatorSubscription call() {
OperatorSubscription innerSubscription = new OperatorSubscription();
outerSubscription.add(innerSubscription);
return innerSubscription;
}
});
} Using a public void subscribe(Observer<T> o, Func0<OperatorSubscription> sf) {
f.call(o, sf.call());
} It's a little unnecessary for a private operator "doing the right thing" and involves closing over the |
Updated prototype code with This would make the Thus, an The "public" signature of |
I've implemented zip with this new paradigm and it felt a bit odd: public <U, R> Obsurvable<R> zip(Obsurvable<U> other, Func2<T, U, R> resultSelector) {
Obsurvable<T> t = this;
return Obsurvable.create((o, s) -> {
Object guard = new Object();
Queue<T> leftQueue = new LinkedList<>();
Queue<U> rightQueue = new LinkedList<>();
AtomicBoolean leftDone = new AtomicBoolean();
AtomicBoolean rightDone = new AtomicBoolean();
OperatorSubscription tsub = new OperatorSubscription();
OperatorSubscription usub = new OperatorSubscription();
s.add(tsub);
s.add(usub);
Action0 stride = () -> {
boolean done = false;
synchronized (guard) {
while (!leftQueue.isEmpty()
&& !rightQueue.isEmpty()
&& !s.isUnsubscribed()) {
try {
o.onNext(resultSelector.call(leftQueue.poll(), rightQueue.poll()));
} catch (Throwable e) {
o.onError(e);
s.unsubscribe();
return;
}
}
if (!s.isUnsubscribed()) {
if ((leftQueue.isEmpty() && leftDone.get())
|| (rightQueue.isEmpty() && rightDone.get())) {
done = true;
o.onCompleted();
}
}
}
if (done) {
s.unsubscribe();
}
};
t.observe(new Observer<T>() {
@Override
public void onNext(T args) {
synchronized (guard) {
leftQueue.offer(args);
}
stride.call();
}
@Override
public void onError(Throwable e) {
synchronized (guard) {
o.onError(e);
}
s.unsubscribe();
}
@Override
public void onCompleted() {
synchronized (guard) {
leftDone.set(true);
}
stride.call();
}
}, () -> tsub);
other.observe(new Observer<U>() {
@Override
public void onNext(U args) {
synchronized (guard) {
rightQueue.offer(args);
}
stride.call();
}
@Override
public void onError(Throwable e) {
synchronized (guard) {
o.onError(e);
}
}
@Override
public void onCompleted() {
synchronized (guard) {
rightDone.set(true);
}
stride.call();
}
}, () -> usub);
});
} And a test method: Obsurvable.from(1).repeat().take(10000).zip(
Obsurvable.from(2).repeat().take(5), (a, b) -> a + b).take(2)
.subscribe(System.out::println); Which again doesn't work with the reference repeat() as it should not call onCompleted once its upstream calls it because it will continue with subscribing to it again. Once fixed the above example works. The oddity comes from where and what to call unsubscribe on; is this okay? OperatorSubscription tsub = new OperatorSubscription();
OperatorSubscription usub = new OperatorSubscription();
s.add(tsub);
s.add(usub); And should I call |
It's probably unrealistic to assume that all of the operations can be rewritten in one pull request. This method will allow existing operators that use public static <X> Obsurvable<X> create(final OnSubscribeFunc<X> onSub) {
return create(new Action2<Observer<X>, OperatorSubscription>() {
@Override
public void call(Observer<X> in, OperatorSubscription s) {
Subscription sub = onSub.onSubscribe(in);
s.add(sub);
}
});
} |
@akarnokd Here is another implementation of The only place an There are no places where it must be checked. The most interesting unit test for this is The thing that makes this work is that whenever a subscription occurs via os[i].observe((InnerObserver) observers[i], new Func0<OperatorSubscription>() {
@Override
public OperatorSubscription call() {
return childSubscription;
}
}); This could be simplified to the following if we want to make a user have to "do the right thing" when using it: os[i].observe((InnerObserver) observers[i], childSubscription); The same |
@akarnokd I think the root of your zip problems comes from the use of |
For last couple days I've been working on building on this to build a visual debugger of sorts for Rx. Because of the OperatorSubscription.add() method it allows the association between inner and outer observers to be discovered (aka merge). Here is a sample the raw data that I was able to get with four hooks (on create, on bind, on add and on observe) for the code
|
In the current API, the fundamental thing you do with an observable object is subscribe to it by passing an observer and getting a subscription back. You can invoke that behavior on In the proposed new API, the fundamental thing you do is observe by passing in both an observer and a subscription. You can invoke that behavior with |
I think you might be right about the Func0 being unnecessary. My current workspace is too far removed from the base line its hard to say for sure but I was able to remove the Func0 implementations and it still work for my use case. |
While implementing various operators I ran into The prototype code is still here: https://gist.github.com/benjchristensen/8367765 It has grown to include I have also split out a simpler example (ObsurvableBind.java) with just the The essential parts of the code are currently: public static <T> ObsurvableBind<T> create(final Action2<Observer<T>, OperatorSubscription> f) {
return new ObsurvableBind<T>(f);
}
public void observe(Observer<T> o, OperatorSubscription sf) {
f.call(o, sf);
}
public Subscription subscribe(Observer<T> o) {
final OperatorSubscription os = new OperatorSubscription();
observe(o, os);
return os;
}
public static interface Operator<T> extends Observer<T> {
public OperatorSubscription getSubscription();
}
public <R> ObsurvableBind<R> bind(final Func2<Observer<R>, OperatorSubscription, Operator<T>> bind) {
return new ObsurvableBind<R>(new Action2<Observer<R>, OperatorSubscription>() {
@Override
public void call(Observer<R> o, final OperatorSubscription s) {
Operator<T> ot = bind.call(o, s);
observe(ot, ot.getSubscription());
}
});
} This achieves the generally desired traits but I'd like to see if we can come up with a better way of representing the The |
Here is another possible variant of the signature: https://gist.github.com/benjchristensen/8497234 Instead of public static <T> ObsurvableBind2<T> create(final Action1<Operator<T>> f) {
return new ObsurvableBind2<T>(f);
}
public <R> ObsurvableBind2<R> bind(final Func1<Operator<R>, Operator<T>> bind) {
return new ObsurvableBind2<R>(new Action1<Operator<R>>() {
@Override
public void call(Operator<R> o) {
observe(bind.call(o));
}
});
}
public void observe(Operator<T> o) {
f.call(o);
}
public Subscription subscribe(final Observer<T> o) {
final OperatorSubscription os = new OperatorSubscription();
observe(createOperator(o, os));
return os;
}
public static interface Operator<T> extends Observer<T> {
/**
* Get the Subscription intended to pass up to the source being bound to.
* <p>
* In other words, the subscription from Operator -> Source
*/
public OperatorSubscription getSubscription();
} |
Here is another variant: https://gist.github.com/benjchristensen/8506432 This eliminates the public static abstract class Operator<T> implements Observer<T>, Subscription {
private final CompositeSubscription cs;
Operator(CompositeSubscription cs) {
this.cs = cs;
}
Operator() {
this.cs = new CompositeSubscription();
}
/**
* Used to register an unsubscribe callback.
*/
public final void add(Subscription s) {
cs.add(s);
}
@Override
public final void unsubscribe() {
cs.unsubscribe();
}
public final boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
} The public static <T> ObsurvableBind3<T> create(final Action1<Operator<T>> f) {
return new ObsurvableBind3<T>(f);
}
public <R> ObsurvableBind3<R> bind(final Func1<Operator<R>, Operator<T>> bind) {
return new ObsurvableBind3<R>(new Action1<Operator<R>>() {
@Override
public void call(Operator<R> o) {
observe(bind.call(o));
}
});
}
public void observe(Operator<T> o) {
f.call(o);
} @rickbw Are these different variants solving your concerns? What improvements would you make to their signatures and which do you prefer? @duarten I believe these new signatures address the problem you brought up regarding the passing of |
public static abstract class Operator implements Observer, Subscription { } This I like. On Sun, Jan 19, 2014 at 4:44 PM, Ben Christensen
|
This thread blessed me with the following insight: If we look at the pull-oriented interface Iterator<T> {
boolean hasNext();
T next();
} and try to find its push-oriented dual, we get something like interface PushIterator<T> {
boolean wantsNext();
void onNext(T);
void onError(Throwable);
void onComplete();
} which is basically the same as @benjchristensen's public static abstract class Operator<T> implements Observer<T>, Subscription where That said, I like the |
Samuel, the subscription comes from the IDispoable, which is not in the Java Iterable interface. |
Yes, it solves that problem. I like it! |
- Changed `bind` signature to match the variant discussed at ReactiveX#746 (comment) - Updated code to new signature. - Re-implemented GroupBy operator with `bind`
Anyone interested in this please take a look at the pull request I just submitted. @headinthebox and @samuelgruetter I could use your help fixing the Scala module. |
- Changed `bind` signature to match the variant discussed at ReactiveX#746 (comment) - Updated code to new signature. - Re-implemented GroupBy operator with `bind`
Thanks everyone for your discussion on this. I have merged the implementation. Next steps are:
|
Sorry for chiming in late. I'm a bit all over the place these days and some of the latest developments in the project slipped past me. I'll comment here w.r.t. #770 as well. I read the proposal and I like the suggested changes. One concern I have is that observers now have to extend an abstract class. In Java, it's quite intrusive if a library or a framework forces super classes on you, since it lacks multiple inheritance. We had a few components in our app that would behave like observers, but inherited from Android framework classes. This was quite nice, since they would provide the "glue" between the Android framework and our Rx based client code. We now have to convert these to observers first by delegating to an inner class, which makes their public API a bit awkward. More problematic though is test friendliness. Maybe I'm missing something, but a very common recipe that we adopted in dozens of unit tests is the following:
This is not possible anymore, since FWIW, Mockito @SPY's work, but it might not be wise to rely on spies. Maybe this could at least be mitigated by providing a non-final getter for the subscription which SafeObserver queries? At least then we'd be able to return a subscription for the mock that SafeObserver can accept. Ben also pointed me to Sorry I'll have to work more with the current code base to get a deeper impression, just wanted to throw this in as a first impression of sorts. |
Thanks @mttkay for getting involved and providing feedback. I too have similar concerns about For unit tests your code would need to change like this: Observer mockObserver = mock(Observer.class);
obj.getObservable().subscribe(new TestObserver(mockObserver));
ArgumentCaptor<Observer> onNextArgs = ArgumentCaptor.of(...)
verify(mockObserver).onNext(onNextArgs.capture())
// do expecations against onNextArgs Going back to the discussion on signatures in #775 the choice is basically to either: a) change At this point the strongest argument has been to keep the public API clean and have a single The current signatures are: // Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f)
// Observable.OnSubscribe typed function interface
public static interface OnSubscribe<T> extends Action1<Observer<? super T>>
// lift function
public <R> Observable<R> lift(final Func1<Observer<? super R>, Observer<? super T>> bind)
// Observer
public abstract class Observer<T> implements Subscription {
public abstract void onNext(T t);
public abstract void onError(Throwable e);
public abstract void onCompleted();
public final void add(Subscription s)
public final void unsubscribe()
public final boolean isUnsubscribed()
}
// Subject
public abstract class Subject<T, R> extends Observer<T> {
public abstract Observable<R> toObservable();
} Anyone have a better design than this? |
This code demonstrates what the new signatures allow: Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> ob) {
for (int i = 1; i < 100000; i++) {
/*
* The Observer communicates whether it is unsubscribed
* so loops and seqential processing on the same thread
* can now unsubscribe.
*/
if (ob.isUnsubscribed()) {
System.out.println("--- Unsubscribed at: " + i);
return;
}
ob.onNext(i);
}
ob.onCompleted();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer i) {
System.out.println("Received: " + i);
if (i == 10) {
// an Observer can now unsubscribe
unsubscribe();
}
}
});
} This outputs:
Note how the source This is equally beneficial if the Observable is made async by running on a separate thread, but still a sequential for-loop instead of trampolining (which is far slower than a loop and both less obvious and more complicated to implement): Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> ob) {
for (int i = 1; i < 100000; i++) {
/*
* The Observer communicates whether it is unsubscribed
* so loops and seqential processing on the same thread
* can now unsubscribe.
*/
if (ob.isUnsubscribed()) {
System.out.println("--- Unsubscribed at: " + i);
return;
}
ob.onNext(i);
}
ob.onCompleted();
}
// use subscribeOn so it is now async
}).subscribeOn(Schedulers.newThread()).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer i) {
System.out.println("Received: " + i);
if (i == 10) {
// an Observer can now unsubscribe
unsubscribe();
}
}
}); This means we do want the Unit testing is doable by using The remaining drawback is the inability to A relevant question is whether classes need to Similarly classes do not need to be an Right now We either need a new name for |
Thanks for the write up Ben! I do agree this is very desirable. So an Observer's role is an active one now (it can unsubscribe) rather than I usually name interfaces after the behavior they enable. If we reintroduce Just my thoughts. |
Can't we provide a default implementation of Observer like this? public abstract class Observer<T> implements Subscription {
public abstract void onNext(T t);
public abstract void onError(Throwable e);
public abstract void onCompleted();
protected Subscription subscription = .... default implementation for Subscription ...
public final void add(Subscription s){ subscription.add(s); }
public final void unsubscribe(){ subscription.unsubscribe(); }
public final boolean isUnsubscribed(){ return subscription.isUnsubscribed(); }
} |
That is how it is implemented: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observer.java |
If we go this path, an interface public abstract class Observer<T> implements Subscription, Notifiable Or we could leave public abstract class Subscriber<T> implements Observer<T>, Subscription |
Note that if we went to having That is probably fine though, as most Thus we could have methods like this in void subscribe(Subscriber s)
Subscription subscribe(Observer o) |
I did an implementation using |
Over the past couple months several sources of inspiration have trigged the idea of adding a
bind
operator toObservable
. I have played with an implementation and propose we adopt it for RxJava but want to get feedback on signatures and find out if there are any use cases I'm completely missing that would negate the idea.tldr; Prototype code => https://gist.github.com/benjchristensen/8367765
Jan 29th: Updated to
lift
instead ofbind
based on discussions in #775Benefits
1) Synchronous Unsubscribe
The primary benefit of this proposal is that synchronous Observables can be unsubscribed. This means an infinite sequence of integers without a separate thread could now be used. A large Iterable followed by
take(20)
would actually only emit the first 20.This is achieved by injecting
Subscription
into what we today callObservable.OnSubscribeFunc
instead of it being returned by the function.An Observable of this nature would check the Subscription inside its loop such as this:
2) Custom Operator Chaining
Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to
rx.Observable
is using static methods. This has meant code like this:In reality we want:
Using
bind
we can get quite close to this:Here is how the proposed
bind
method looks if all operators were applied with it:3) Simpler Operator Implementations
The
bind
operator injects the necessaryObserver
andSubscription
instances and eliminates (for most use cases) the need for manual subscription management. Because theSubscription
is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.For example,
fromIterable
is simply:The
take
operator is:4) Eliminate Need for CurrentThreadScheduler
A side-effect of injecting the
Subscription
and handling synchronous execution is we can now use loops instead of trampolining for operators likerepeat
. This means we should not need theCurrentThreadScheduler
(as far as I can tell) and the ugliness it takes to make synchronous unsubscribes work.Here is the repeat operator without need for scheduling:
5) Recursion/Loop Performance
The
fromIterable
use case is 20x faster when implemented as a loop instead of recursive scheduler (see a18b8c1).Many places recursive scheduling were used to avoid stack overflow can be done as a loop instead.
Drawbacks
1) Observable.create Signature Change
Currently the
Observable.create
method is defined as:This is effectively the same as:
To inject the
Subscription
we need it to instead be:Note that there is not return type any longer, the
Subscription
is injected instead of returned.Then to support operators conditionally attaching themselves to the injected
Subscription
the signature is actually:The
OperatorSubscription
simply addsvoid add(Subscription s)
so each operator can register with it as needed.Prototype Code
The prototype code can be found at https://gist.github.com/benjchristensen/8367765
The name of the class is
Obsurvable
and this code can be dropped into therx
package asObsurvable.java
alongsideObservable.java
for testing.Design Decisions
If we agree that this proposal make sense, there are some design decisions to make:
1) Action2/Func2 or New Type?
Right now the prototype code has this:
Should we create types that represent those? I think the
create
one probably should, for the same reason as before, to simplify the pain of dealing with generics. Thus it would beOnSubscribeFunc
,OnSubscribe
or something similar.For the
bind
method I'm less opinionated, though it may be cleaner to provide a type that represents theFunc2
.I did play with a variant of this that combined
Observer
andOperatorSubscription
into a single typeOperator
to make the function simpler but it conflated things and was quite troublesome with composition as theObserver
andSubscription
had different lifecycles and intents.2) Private Subscribe for Nested Observables
If you look at the
MergeOperator
code you'll see inside theonNext(Obsurvable<T> innerObservable)
that it has this code:This is reaching inside the
Obsurvable
to the private fieldf
and invoking it.This is equivalent to calling
Obsurvable.subscribe
except that it inject theSubscription
rather than returning it.I still want the normal
Subscription Observable subscribe(Observer o)
signature for public consumption, but am considering whether we should have:a) an alternate public signature for subscription that looks like
void Observable otherSubscribe(Observer o, Subscription s)
b) a private mechnism that operator implementations can somehow subscribe (not sure yet how without these only being implemented inside
Observable
itself)I tend to prefer (b) so we don't leak implementation details, but (a) would allow anyone to create even the more complicated nested operators such as
merge
outside ofObservable.java
.3) OperatorSubscription Name
Is this name okay? Where should it live? Is its signature correct?
4) Observable.create
Should we deprecate or keep the current signature?
If it exists alongside the new one and both have the
create
name accepting a single function it will break Groovy and Clojure (and probably JRuby) as they won't be able to distinguish between overloads. Thus it likely is best to just do the breaking change cleanly and have a singlecreate
method with the new signature.Is there any reason to have the old signature that returns
Subscription
instead of accepting it as an argument?Operator Refactor
If we adopt this pattern it will take some time to retrofit all operators to use bind.
In
Observable.java
they would be coded something like this usingbind
:Each operator would need to be changed to this new model.
I suggest we leverage this to do other cleanup such as:
Notification
or use ofmaterialize
)/src/perf
folder and add a performance test for each operator implementation similar to https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java and https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/test/java/rx/subjects/SubjectPerformanceTests.javaOperation*
toOperator*
as a marker of which have been migrated (and to read better and match the package name)I'd appreciate feedback and improvement. Thanks.
The text was updated successfully, but these errors were encountered: