Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind Operator #770

Merged
merged 23 commits into from
Jan 22, 2014
Merged

Bind Operator #770

merged 23 commits into from
Jan 22, 2014

Conversation

benjchristensen
Copy link
Member

Implementation of bind operator along with new create and subscribe methods and several reimplemented operators: fromIterable, toList, toSortedList, map, cast, timestamp, merge, flatMap, mergeMap, groupBy, parallel.

The bind signature is:

public <R> Observable<R> bind(final Func1<Operator<? super R>, Operator<? super T>> bind)

The new Operator class signature is:

public abstract class Operator<T> implements Observer<T>, Subscription

The create method now looks like:

public final static <T> Observable<T> create(final Action1<Operator<? super T>> f)

A new subscribe signature is:

public final void subscribe(Operator<? super T> o)

The use of these three new methods allows for chaining operators together and supporting unsubscription on both synchronous and asynchronous sources.

Status and Plan
  • I have temporarily disabled Scala and Kotlin builds until they are fixed.
  • The Groovy module has been retrofitted with an extension module to intercept and disambiguate the new create method from the deprecated one.
  • I have not yet tested Clojure or JRuby
  • I am pretty certain I have the generics wrong on OperatorMerge so am using it in non-typesafe manner right now

This design has been reviewed with several people on Github and in person. Besides resolving the items listed above, my intention is as follows:

  • invite feedback and review on this pull request in the next 12-24 hours
  • merge into trunk if nothing major comes up
  • work towards resolving issues with the language adaptors and polishing any rough edges of the new changes
  • finalize bind signatures
  • consider deprecating or eliminating CurrentThreadScheduler (if it needs to exist at all, perhaps rename to TrampolineScheduler to remove confusion between Immediate and CurrentThread)
  • release as 0.17.0 (along with some unrelated Scheduler changes)
  • continue working in the future migrating operators to this new design

See #746 for background on the proposal, reasons and design discussion.

- new create signature
- new bind operator
- new subscribe overload
- OperationReplay is failing unit tests, all others are passing
- also simplified implementation to not worry about thread-safety as per Rx contract
- performance improvement from 4,033,468 ops/sec -> 6,731,287 ops/sec
- even though unit tests don’t see it because of SafeObserver, the Take operator should not emit onCompleted more than once
- it’s working … but I can’t figure out the co/contra-variance for the generics! anyone have ideas?
- Changed `bind` signature to match the variant discussed at ReactiveX#746 (comment)
- Updated code to new signature.
- Re-implemented GroupBy operator with `bind`
Found bug while doing Parallel. It was completing prematurely when child groups were asynchronous and delayed.
- forgot to add earlier
It had not been successfully migrated before … this now passes unit tests.
This is the bridge until we port it to the new “bind” model.
@cloudbees-pull-request-builder

RxJava-pull-requests #681 FAILURE
Looks like there's a problem with this pull request

@daveray
Copy link
Contributor

daveray commented Jan 21, 2014

I'll try to take a look at the Clojure interop tomorrow. Is OnSubscribeFunc now dead replace by just Action1 to create()?

@benjchristensen
Copy link
Member Author

@daveray Yes, OnSubscribeFunc is on its way out. As of now it is just Action1.

It is like this:

        new Action1<Operator<? super String>>() {

            @Override
            public void call(Operator<? super String> t1) {

            }

        }

I'm open to giving an interface name for it if there is agreement. For example:

public static interface OnSubscribe<T> extends Action1<Operator<? super T>>
        new OnSubscribe<String>() {

            @Override
            public void call(Operator<? super String> t1) {

            }

        }

This was done for OnSubscribeFunc to make it so people don't have to type in the full name including generics Action1<Operator<? super T>> and instead only have to type OnSubscribe<String> and let auto-completion do the rest.

In other words, it is currently this:

Action1<Operator<? super String>>

but could be this:

OnSubscribe<String>

Anyone have strong opinions on this?

@cloudbees-pull-request-builder

RxJava-pull-requests #682 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

Signature items for discussion:

1) Operator

There is a new class called rx.Operator with this signature:

public abstract class Operator<T> implements Observer<T>, Subscription

Can anyone else think of a better name? Is there anything wrong with this signature?

It has two constructors:

    protected Operator(Operator<?> op)
    protected Operator(CompositeSubscription cs)

Any issues with these? I'm somewhat concerned the constructor taking an Operator will confuse people, but it is very useful when implementing operators. The reason I don't use the Operator.create methods more heavily is because extending is far nicer for the implementations since generally someone wants to invoke unsubscribe() or isUnsubscribed() and that can't be done from within an Observer implementation passed into Operator.create.

2) OnSubscribe or Action1<Operator<? super String>>

Should we leave the Observable() constructor and create methods using the raw function Action1<Operator<? super String>> or should we give it a type such as OnSubscribe that simplifies what people need to type, particularly the generics?

If we do give it a type:

a) what name should it have?
b) should it be an inner class of Observable like OnSubscribeFunc was?

public void call(Operator<? super T> o) {
for (T i : is) {
if (o.isUnsubscribed()) {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to emit onCompleted when unsubscription happened?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated back and forth with myself but I think you're right.

The take operator will invoke onCompleted downwards, and unsubscribe upwards. Anything that needs to cleanup or stop should pay attention to unsubscribe, others like map don't care about unsubscribe or onComplete.

Thus I agree with you and will remove the onCompleted.

It's more complicated on groupBy as it is managing multiple subscriptions, and once the inner are unsubscribed the outer must onComplete. The outer being unsubscribed on the other hand wouldn't emit onComplete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear:

for (T i : is) {
    if (o.isUnsubscribed()) {
        return;
    }
    o.onNext(i);
}
if (!o.isUnsubscribed()) {
   o.onCompleted();
}

@benjchristensen
Copy link
Member Author

it seems you don't need to close over the parentSubscription as it is accessible as the cs field, no

@duarten You are correct. Thanks for the review. I'll post an updated commit shortly. It's clearer without closing over it.

@duarten
Copy link

duarten commented Jan 21, 2014

this shouldn't be an issue as observeOn already takes care of this case, regardless the intention of the exception.

Can you elaborate on this? Unless I'm missing something, this would mean that there needs to be some other mechanism to actually propagate the unsubscription, because the call stack is lost and so are the exception handlers.

The problem with returning a boolean is the same as propagating exceptions. Because there is no constraint on where the onNext call is executed, we can't assume we will be able to use the stack to communicate with the upstream operators.

I like the current approach of Subscriptions as cancellation tokens.

@headinthebox
Copy link
Contributor

interface Observer {
boolean onNext(T value);
void onError(Throwable t);
void onCompleted();
}

That does not sound right at all: >> boolean << onNext(T value);

@akarnokd
Copy link
Member

I meant if you use source.observeOn(cpu).subscribe(v -> throw ex) it will unsubscribe from the source.

Otherwise, you are right, one would need to move the boolean return value into the heap, so it basically becomes the subscription.

class ObserveOn.Observer {
  Observer original;
  CompositeSubscription csub = ...
  boolean onNext(T t) {
      csub.add(scheduler.schedule(() -> {
          if (!original.onNext(t)) {
             csub.unsubscribe();
          }
      }));
      return !csub.isUnsubscribed();
  }
}

That does not sound right at all: >> boolean << onNext(T value);

I would say it is the dualization of break and continue of the Iterator's usage pattern.

@benjchristensen benjchristensen mentioned this pull request Jan 21, 2014
@benjchristensen
Copy link
Member Author

This new approach feels a bit odd

@akarnokd It took me a bit to get used to the new mental model as well while implementing operators as the flow is inverted from before.

However, the benefits are pretty convincing and I have come to prefer the model after implementing several operators.

@benjchristensen
Copy link
Member Author

One last comment about stopping the upstream in the current setup could be achieved via custom exception (or CancellationException) thrown from onNext and specifically ignored in onError of the same observable. It is similar to the case when one needs to stop a periodic action scheduled on the thread pool and doesn't have access to the Future.

This should already be working due to how we catch errors and propagate via onError until they hit the last Observer wrapped in SafeObserver which will call unsubscribe.

That said, what benefit does this give us as a programming model? Using the composed Subscription and Observer.onError is elegant and works well. I consider throwing exceptions in this model to be "exceptional" - as in it wasn't expected and is out-of-band and breaking the normal contract.

then one could have a more cooperative observer where onNext returns boolean to indicate continuation/stop

Similarly, what does this give us that the Subscription composition does not already give us?

On both of these, I'm not quite understanding what problems you are seeking to solve. Can you elaborate?

@cloudbees-pull-request-builder

RxJava-pull-requests #690 FAILURE
Looks like there's a problem with this pull request

@akarnokd
Copy link
Member

The same issue as this new model: to stop from() or repeat() like producers in downstream operators like take() or one's Observer at the end of the chain.
The benefit it gives is the lack of Subscription overhead when used solely as cancellation token.

Edit: i suggest removing the failing test for the time being until a more load-insensitive variant is made.

@benjchristensen
Copy link
Member Author

The benefit it gives is the lack of Subscription overhead when used solely as cancellation token.

By overhead are you referring to object allocation or memory? To achieve composition do you envision some operators would need to try/catch(CancellationException) around onNext and then conditionally re-throw, similar to how GroupBy must separate the subscriptions?

Generally though, I don't like the idea of using exceptions for control flow. Unsubscribe is not an uncommon event (such as with take) so using it for control flow seems very inappropriate (and I don't know what the performance implications are).

@akarnokd
Copy link
Member

The benefit it gives is the lack of Subscription overhead when used solely as cancellation token.

I meant the boolean onNext() here.

For the exception case, there is no need for extra try catch as exceptions will propagate back down on the onError wire, breaking out of loops and triggering unsubscriptions along the way. In terms of a TakeObserver:

onNext { if (count >= limit) { o.onCompleted(); throw CancellationException(); } o.onNext(v); }
onError { if (e instanceof CancellationException) return; o.onError(e); }

But Exceptions are quite expensive compared to a simple boolean return value indeed.

@benjchristensen
Copy link
Member Author

For the exception case, there is no need for extra try catch as exceptions will propagate back down on the onError wire, breaking out of loops and triggering unsubscriptions along the way.

The TakeObserver is a very simple use case. I started with a single Subscription for the entire sequence in my bind prototyping but found it insufficient. There are various use cases (such as the one demonstrated by groupBy) that require decoupling the subscriptions. For this reason the CancellationException would have to be caught and not permitted to pass all the way.

boolean onNext()

I played with these types of return signatures for a while trying to see if I could compose the back pressure channel but found it didn't compose well in things like merge and zip. I never considered it for replacing Subscription though.

The problem I see with using boolean onNext to replace Subscription is that it requires an onNext in order to be applied. Thus it doesn't work on an Observable that never emits, or is very slow to emit but should be shut down (unsubscribed and resources cleaned up ... such as a network connection) or operators like timeout that need to fire an unsubscribe that is not related to an onNext occurring.

@benjchristensen
Copy link
Member Author

Merging. Discussion can continue here as the code will surely evolve on the master branch before we release.

I have opened issue #775 for discussing the naming and type signatures.

benjchristensen added a commit that referenced this pull request Jan 22, 2014
@benjchristensen benjchristensen merged commit 1573df9 into ReactiveX:master Jan 22, 2014
@benjchristensen benjchristensen deleted the bind branch January 22, 2014 04:13
@akarnokd
Copy link
Member

Practically, the signature boolean onNext(T value) is the same as void onNext(T value); boolean isUnsubscribed() where the former's boolean return value is refactored into a separate method. In both cases, one needs a heap object to cross thread boundaries and complex operators.

@benjchristensen
Copy link
Member Author

boolean isUnsubscribed()

The difference is that Operator has more functionality than just isUnsubscribed, notably the add(Subscription) method which allows registering callbacks to handle the use cases I mentioned. If those use cases didn't exist, then sure, it would work.

@headinthebox
Copy link
Contributor

Really it is a composite subscription. Which is probably fine to have a the base class/interface for subscriptions.

(one of the side effects of this exercise is that I hope we can junk most of the subscription types ...)

@benjchristensen
Copy link
Member Author

Yes it is a CompositeSubscription (without clear and remove). Hence the reason why Observer implementing Subscription and expecting users to implement these methods is not correct.

@akarnokd
Copy link
Member

Wouldn't we need the remove(), for example, in groupByUntil to unsubscribe and remove groups? I guess one could always have its on Composite and use add(), but how often do we need the ability to add multiple sub-subscriptions in the first place. I think Rx.NET's Sink class, the closest equivalent uses SingleAssignmentSubscription for it.

@benjchristensen
Copy link
Member Author

I'm arguing for the fact that CompositeSubscription exists for good reason and the Observer&Subscription use case the Operator solves is not the same.

@headinthebox
Copy link
Contributor

I'd say there are 4 scenarios for API users (as opposed to API implementors)

(a) you pass in the individual methods and we create an observer for you.
(b) you call Observer.create and we create one for you.
(c) you get an observer from somewhere else, say a subject and you pass that on.
(d) you really want to implement one by hand, then you can use ObserverBase, or
(e) You thin you are smart, then shoot yourself in the foot.

@headinthebox
Copy link
Contributor

SingleAssignmentSubscription is not a friendly type. I bet you can replace SingleAssignmentDisposable with MultipleAssignmentDisposable in the whole Rx.NET code base and it will still work (caveat emptor)

@akarnokd
Copy link
Member

Excuse my short memory, but what problems do we want to solve with this new create/bind/subscribe approach?

@benjchristensen
Copy link
Member Author

#746

  1. Synchronous Unsubscribe
  2. Custom Operator Chaining
  3. Simpler Operator Implementations
  4. Eliminate Need for CurrentThreadScheduler
  5. Recursion/Loop Performance

@akarnokd
Copy link
Member

Thanks.

I was looking at ways to implement repeat() but I've stumbled upon a problem:

return from(Arrays.asList(this)).bind((u, k) -> new Observer<Obsurvable<T>>() {
            @Override
            public void onNext(Obsurvable<T> args) {
                while (!k.isUnsubscribed()) {
                    args.subscribe(new Observer<T>() {
                        @Override
                        public void onNext(T args) {
                            u.onNext(args);
                        }
                        @Override
                        public void onError(Throwable e) {
                            u.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                        }
                    }, k);
                }
            }
            @Override
            public void onError(Throwable e) {
                u.onError(e);
            }
            @Override
            public void onCompleted() {
            }
        });

It works if the source is a synchronous from:

from(Arrays.asList(1)).repeat().take(10).subscribe(System.out::println);

But starts to spin rapidly if one uses subscribeOn (which may lead to OOM and is just wasteful):

from(Arrays.asList(1)).subscribeOn(Schedulers.computation()).repeat().take(10).subscribe(System.out::println);

I did some implementation experiments and it appears one would always need some queue to avoid recursion.

In addition, I felt I don't need to combine Observer & Subscription but to close over a CompositeSubscription, where I can add/remove dependent subscriptions at will.

@benjchristensen
Copy link
Member Author

I haven't spent the time to play with it, but it seems to me that the repeat should not occur until onComplete is called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants