Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable interface #744

Closed
wants to merge 12 commits into from
Closed

Conversation

rickbw
Copy link
Contributor

@rickbw rickbw commented Jan 13, 2014

The core purposes of the Observable type—visitation of elements, detection of completion, and detection of errors—are currently tangled with a set of fluent utility methods. This tangling makes the core contract of observability harder to understand, and it makes sub-typing less straightforward and less flexible.

To resolve this situation, I have refactored an interface out of Observable, which I have tentatively dubbed IObservable. (Ideally, the interface itself would be called Observable, and the class FluentObservable. See Guava's FluentIterable alongside the JDK's Iterable for another example of this pattern. However, I did not want to break backward compatibility.) All public methods now accept IObservable instead of Observable in their argument lists, while those that returned Observable continue to do so. This pattern gives the best of both worlds: any IObservable implementation can be used anywhere Observable and its subclasses can, and we don't lose the convenience of the fluent API. In particular, any IObservable can be converted into an Observable via the new method Observable.from(IObservable).

This change should be 100% backwards-compatible at compile time, though the changed signatures mean that it is not backwards-compatible at the binary level. I have observed a number of breaking API changes on master over the last few weeks, so hopefully that is not a deal breaker. If it is, we can always restore overloads that accept Observable in addition to IObservable, but that seemed like a pretty large amount of duplication and complexity for a small benefit, so I have not done it so far.

rickbw added 11 commits January 8, 2014 23:36
The IObservable interface supports the core contract of observability: handling of each element, detection of completion, and detection of errors. The concrete Observable class is now focused on its rightful role: providing a set of highly usable fluent utilities for working with IObservables. For another example of this pattern, see e.g. Guava’s FluentIterable vs. the JDK’s Iterable. (If not for backward compatibility, Observable might have been called “FluentObservable” and IObservable simply “Observable”.)
Utilities outside of the fluent Observable class are now more reusable as a result.
…rationSubscribe class.

This refactoring makes these subscribe(…) forms more reusable with IObservables that aren’t of the concrete type Observable. See the changes in ChunkedOperation as an example.
Merged as of commit 'caeaf58d58c52b1028b7d4ec2d34dfc9b0baeb4d’.

Conflicts:
	rxjava-core/src/main/java/rx/Observable.java
	rxjava-core/src/main/java/rx/joins/JoinObserver1.java
	rxjava-core/src/main/java/rx/joins/Pattern1.java
	rxjava-core/src/main/java/rx/joins/Pattern2.java
	rxjava-core/src/main/java/rx/joins/Pattern3.java
	rxjava-core/src/main/java/rx/observables/BlockingObservable.java
	rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
	rxjava-core/src/main/java/rx/operators/OperationBuffer.java
	rxjava-core/src/main/java/rx/operators/OperationConcat.java
	rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java
	rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java
	rxjava-core/src/main/java/rx/operators/OperationJoin.java
	rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java
	rxjava-core/src/main/java/rx/operators/OperationLast.java
	rxjava-core/src/main/java/rx/operators/OperationMap.java
	rxjava-core/src/main/java/rx/operators/OperationMostRecent.java
	rxjava-core/src/main/java/rx/operators/OperationMulticast.java
	rxjava-core/src/main/java/rx/operators/OperationNext.java
	rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
	rxjava-core/src/main/java/rx/operators/OperationToMap.java
	rxjava-core/src/main/java/rx/operators/OperationToMultimap.java
	rxjava-core/src/main/java/rx/operators/OperationWindow.java
Conflicts:
	rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/operators/OperationConditionals.java
	rxjava-core/src/main/java/rx/Observable.java
	rxjava-core/src/main/java/rx/observables/BlockingObservable.java
	rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
	rxjava-core/src/main/java/rx/operators/OperationParallel.java
	rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java
@cloudbees-pull-request-builder

RxJava-pull-requests #659 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

The library originally had an Observable interface and associated implementation. It was removed because in practice it does not work due to all methods hanging off of Observable. 100+ methods on an interface all needing complex implementations and by design intended to comply with a specific contract made alternate implementations impractical and thus the interface was noise.

If we had extension methods like C# we would design it with a pure interface like Rx.Net, but Java doesn't support that.

Observables should not be created via inheritance, they are created via Observable.create. Thus, the OnSubscribeFunc is the light-weight interface anything can implement and then become an Observable for application of operators.

@rickbw
Copy link
Contributor Author

rickbw commented Jan 13, 2014

I agree that an interface with hundreds of methods would be ungainly and inappropriate. It would also not meet the goal of separating the core contract of observability from the utility methods, and it is not what I have done. (In my opinion, if observability cannot be expressed as concisely as iterability, the API has room for improvement.) The IObservable interface contains only a single method: subscribe(Observer). Please review my first commit, rickbw@bcb2a60; I think you will see what I mean.

Here's an example:

class MyObservable implements IObservable<String> {
    public Subscription subscribe(Observer<? super String> ob) {
        return new MySubscription();
    }
}

IObservable<String> obs = new MyObservable();
Observable.from(obs).multicast(...);

For another example of the same pattern, see http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/FluentIterable.html.

The reason that OnSubscribeFunc works as well as it does is that it does what an (I)Observable interface itself should do. My next task will be to replace OnSubscribeFunc with IObservable. This should be trivial, since the only difference between them is a difference in the method name: subscribe vs. onSubscribe. Look for an update to this pull request soon.

@akarnokd
Copy link
Member

One could argue that having the separate OnSubscribeFunc is also unnecessary as the single parameter callback could be done via Func1<Observer<? super T>, Subscription>. If you look at my reactive4java library, I implemented it with Observable interface and static methods on the Reactive class. This was later extended to have an ObservableBuilder to allow the fluent usage. However, after joining RxJava, I can now see that merging all these things into a class-based Observable is a better choice, even Java 8 default methods can't really compete with it.

@benjchristensen
Copy link
Member

What are you trying to achieve from this change?

There is very little value in an IObservable interface with a single subscribe method on it. The value of rx.Observable is all the operators, not the simple pub/sub interfaces.

If there is just an IObservable.publish() everyone ends up working with Observable and ignoring the IObservable otherwise they always have to convert from IObservable to Observable. That's what we had in our code base at Netflix originally and the interface was useless and a nuisance.

The purpose of OnSubscribeFunc was to provide a simple, composable API for observability. IObservable already does that, and with the same method signature, so there’s no need for a second API for the same thing. There are two benefits for the API:
1. The syntax for composing operations is cleaner, with less overhead in syntax and in object count. Instead of Observable.create(op(create(arg1), create(arg2), create(arg3))), you can now write simple op(arg1, arg2, arg3).
2. The Operation classes are more consistent with one another. Previously, some of them provided actual Observables, and others merely OnSubscribeFunc. Which one was expected when wasn’t particularly clear, and it was up to the caller to wrap or unwrap the object as needed. Now, every Operation provides IObservables of one concrete type or another.
@cloudbees-pull-request-builder

RxJava-pull-requests #672 FAILURE
Looks like there's a problem with this pull request

@rickbw
Copy link
Contributor Author

rickbw commented Jan 15, 2014

I have completed replacing OnSubscribeFunc with IObservable. I will resolve the merge conflicts next.

Ben, you write that there is little value in an observable interface with a single subscribe method on it. Yet in the master branch, that is exactly what OnSubscribeFunc is, and the whole API is built on it. Likewise, the value in the Java Collections API isn't in the Iterable interface, which is just hasNext() and next(), yet the entire library is built on it. So I don't see it as a question of moving or removing value; I see it as a question of the separation of concerns. When I hear that Iterable is a good tool for building incremental pull-based algorithms, I go to the docs and I see a concise API that I can immediately understand. When I hear that Observable is a push-based Iterable analog, I go to the docs and see dozens of methods, related and unrelated.

My goal is to make the API easier to understand and compose by separating concerns: the ability to observe something is different from what algorithms you might devise if you had the ability to do so. IObservable is the former; Observable is the latter. The fluent API has not been sacrificed in any way; in fact it is lighter and easier to understand now. Previously, some operations provided Observables and others provided OnSubscribeFuncs. This was frustrating, since the documentation for Observable.create(OnSubscribeFunc) said that OnSubscribeFunc was required to behave just like Observable itself anyway, and the latter objects had to be wrapped in the former. So the code was full of stuff like: create(op(create(arg2), create(arg2), create(arg3))). Now all operators return IObservables of one concrete type or another, and they can all be composed in the same way, so you can just write op(arg1, arg2, arg3).

I understand the concern about having to constantly wrap stuff, and I can't speak to your code base. But I can say from working with the RxJava code base that I have actually removed a lot more calls to Observable.create(OnSubscribeFunc) than I have added calls to Observable.from(IObservable). In fact many operators no longer depend on Observable at all anymore, because all they need to do is subscribe.

@benjchristensen
Copy link
Member

My goal is to make the API easier to understand and compose by separating concerns

Please give me an example of something that doesn't compose today.

the Iterable interface, which is just hasNext() and next()

Practically no code ever exposes or uses Iterable directly. It is always a List or Collection that is used.

I understand the concern about having to constantly wrap stuff

Returning IObservable from application code is not the solution as that means all code bases will have to wrap it into an Observable for it to be any use.

I have completed replacing OnSubscribeFunc with IObservable. I will resolve the merge conflicts next.

Please don't spend time on this while there is still not agreement on the approach being taken. What you are suggesting be changed is something we actively moved away from over a year ago thus it would require several people all deciding to reverse course.

I am also not happy with the IObservable interface convention. That is non-idiomatic in Java. We have purposefully not included the "I" in front of interfaces such as Observer, Scheduler, Subscription as that is a .Net convention, not Java.

Lastly, the possible migration to using bind would further change this discussion and certainly the implementation: #746

@rickbw
Copy link
Contributor Author

rickbw commented Jan 17, 2014

Returning IObservable from application code is not the solution

Agreed. I am not suggesting returning IObservable instead of Observable and have not made any such change. I am very much in favor of the fluent API and would not want to damage it. I am suggesting accepting IObservable in place of Observable in argument lists. I am also suggesting returningIObservablein place ofOnSubscribeFunc, since the latter is already required to behave exactly likeObservable` anyway.

Please give me an example of something that doesn't compose today.

For example, see OperationCombineLatestTest in master, which looks like this:

Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);

...and in this pull request, which looks like this:

IObservable<String> combineLatestW = combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction());
combineLatestW.subscribe(w);

As you can see, objects which are required to behave like observables can now actually be used as observables. There are not more wrapper objects; there are fewer. Of course, you'll have to tell me the extent to which your application code resembles your test code.

Practically no code ever exposes or uses Iterable directly.

That's an interesting perspective; it's not my experience. I've used Iterable frequently at a couple of different employers. It's great for incremental processing of medium-to-large data sets, where you want to encourage clients to process elements one at a time and discourage waiting for a large fetch to happen and loading the whole result set into memory.

I am also not happy with the IObservable interface convention.

Agreed. I wanted something short that suggested that the object met the core contract of observability, and I couldn't come up with anything better. If we come to an agreement on the approach, then it will be trivial to change the name to whatever you like.

Thanks for your engagement on this request.

@rickbw
Copy link
Contributor Author

rickbw commented Jan 17, 2014

Re: bind, you're right that it would have significant implications for the specifics of the API. However, the general approach of distinguishing the fundamental contract in Observable, and accepting any object with that contract in create and other methods, would be unchanged.

From what I see in your gist, the fundamental operation would no longer subscribe(Observer<T>); it would be observe(Observer<T>, Func0<OperatorSubscription>). You've redefined the former in terms of the latter. Therefore, observe is what I'd want to extract into an interface.

Your proposed successor to OnSubscribeFunc is Action2<Observer<T>, OperatorSubscription>. Once again, there is a near-perfect symmetry between the input/output signature for that type and for the fundamental method of Observable itself. (You've designed APIs for Observable with that symmetry twice now; I must be on to something. :-) ) The only difference is that there seems to be some discussion around whether observe should accept OperatorSubscription or Func0<OperatorSubscription>. I would suggest that whichever you choose, you keep the argument to create and the signature of observe consistent with each other. That is, go with either Action2<Observer<T>, Func0<OperatorSubscription>> and observe(Observer<T>, Func0<OperatorSubscription>) or Action2<Observer<T>, OperatorSubscription> and observe(Observer<T>, OperatorSubscription)

@benjchristensen
Copy link
Member

Considering the current state of the master branch, what would you propose to be done differently?

The signatures are now this:

// Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f)

// Observable.OnSubscribe typed function interface
public static interface OnSubscribe<T> extends Action1<Observer<? super T>>

// lift function
public <R> Observable<R> lift(final Func1<Observer<? super R>, Observer<? super T>> bind)

// Observer
public abstract class Observer<T> implements Subscription {
     public abstract void onNext(T t);
     public abstract void onError(Throwable e);
     public abstract void onCompleted();
     public final void add(Subscription s)
     public final void unsubscribe()
     public final boolean isUnsubscribed()
}

// Subject
public abstract class Subject<T, R> extends Observer<T> {
    public abstract Observable<R> toObservable();
}

@rickbw
Copy link
Contributor Author

rickbw commented Jan 30, 2014

Bear with me, because I haven't been able to follow all of the reasoning that has gone into the changes in between when last we talked and the current state. We might have to iterate here. But here are my thoughts:

I think notionally, there's an object that represents a stream of data (Observable), an object that consumes that data (Observer), and a third that represents that association between them—that is, the state of the stream (Subscription). If we analogize with iteration, an Observable is like an Iterable, an Observer is like the block of code inside the for loop, and a Subscription is like the state of the iteration: you can start iterating, and you can stop iterating whenever you want, even if the data stream has more elements in it.

Each of these objects has a really simple job to do, so it should be possible to describe each with an interface of just a couple of methods. When you write a for loop, there's a moment when you start the stream by "passing" the block of code to execute for each element. Therefore, Observable needs a method to start pushing data that accepts the Observable (subscribe). Like in a loop, that block of code can exit the iteration any time, so the Observable needs to be passed the Subscription as well as the data. Unlike in a loop, the stream may deliver elements asynchronously, so the party that started the iteration may also want to stop the iteration, therefore subscribe should also return the Subscription. All that yields the following:

// Note similarities to cancellation of Futures
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

public interface Observer<T> {
    void onNext(T t, Subscription s);
    // Could pass Subscription here too, but by definition "iteration" is ending.
    void onError(Throwable e);
    void onCompleted();
}

public interface Observable<T> {
    Subscription subscribe(Observer<? super T> o);
}

Then you need a concrete class to provide a library of useful methods for operating on Observables, in a fluent API style. If I were starting the API from scratch, I would call this FluentObservable, because it's descriptive, and its analogous to Guava's FluentIterable, with which I'm quite familiar. I would accept plain interface Observables in the argument lists of this class, to make the API as flexible as possible, and I would return FluentObservable, to make chaining as easy as possible. That yields:

public class FluentObservable<T> implements Observable<T> {
    private Observable<? super T> delegate; // ...initialize in constructor, not shown

    // Replaces create(OnSubscribe):
    public static FluentObservable<T> from(Observable<? super T> delegate) {
        // ...return argument if already Fluent else return new wrapper
    }

    public <X> FluentIterable<X> combineInSomeWay(Observable<X> other) { ... }
    // Lots more...
}

Your lift method would be unchanged, I think.

Subscription.add looks like a Composite design pattern trying to get out. I would define a CompositeSubscription class that implements Subscription and also composes other Subscriptions, and handles the multiplexing among them.

Subject doesn't add any concrete implementation, so I would make it also an interface, extending the Observer interface, but otherwise unchanged from what you've shown.

Is that description clear? What do you think?

@benjchristensen
Copy link
Member

Thank you for the detailed thoughts on this. I agree with your assessment that Observable, Observer and Subscription have very simple, clear definitions at their core and that their jobs are pretty straight forward.

Where I view things differently is that the Observable interface is of little value in practice if it is solely this:

public interface Observable<T> {
    Subscription subscribe(Observer<? super T> o);
}

Everyone would end up using FluentObservable only which defeats the purpose of the Observable interface. Remember that we started the library like that. We literally had IObservable and Observable matching what Rx.Net has. But since we don't have extension methods it doesn't work.

With Observable containing only subscribe, no API will ever return it like this:

public Observable getData()

They will instead return the type that creates all the value:

public FluentObservable getData()

otherwise all consumption would end up being:

FluentObservable.from(getData()).map(...)

instead of:

getData().map(...)

On real-world code with dozens of API calls all returning Observable<T> this becomes a mess of FluentObservable.from(getter()) everywhere in the code.

Thus, what you're actually describing is equivalent to what we now have as:

  • Observable.OnSubscribe == your Observable interface
  • Observable == FluentObservable

No API deals in or returns Observable.OnSubscribe which then forces Observable.create being performed. The OnSubscribe type is an internal type, not a public API type, which is what Observable is used as.

In other words, APIs return Observable like this:

    public Observable<SomeData> getData() {
        return Observable.create(new OnSubscribe<SomeData>() {

            @Override
            public void call(Subscriber<? super SomeData> o) {
                o.onNext(new SomeData());
                o.onCompleted();
            }
        });
    }

They do not return the function:

    public OnSubscribe<SomeData> getDataFunction() {
        return new OnSubscribe<SomeData>() {

            @Override
            public void call(Subscriber<? super SomeData> o) {
                o.onNext(new SomeData());
                o.onCompleted();
            }
        };
    }

as it would then need to be turned into an Observable before it's usable.

Now, if these types were being baked into the JDK, my position would be different on having a base interface as it would be a fixed signature that could never be changed once released and all libraries would be interoperating via the same interfaces with different implementations.

For this library however the interface and implementation will always go together and there is not a common lingua-franca between libraries for what represents an "Observable". Interop between libraries will always require a bridge (like the FluentObservable.from()). Within the library though it should work as cleanly as possible with as few types as possible.

These perspectives (and the experience of having tried separate interface/implementation) have shaped the design of RxJava and are why we choose for the abstract class Observable being the core class of the library that is intended for exposure in library APIs and user code and not a simple interface that needs to be wrapped to use.

Considering these reasons, what specifically are you trying to achieve by making OnSubscribe become the major interface everyone builds APIs against instead of the concrete Observable with all the operators on it? I'd like to understand the use cases you're pursuing.

@rickbw
Copy link
Contributor Author

rickbw commented Jan 31, 2014

I'm evaluating RxJava for a proposed data-access layer, primarily but not necessarily over HTTP/REST, for eHarmony. That layer would be used in many projects, by multiple teams, so I want to make sure that the API building blocks are as clean and simple as possible—for the sake of usability but also of training and understanding.

You're right that OnSubscribeFunc is the equivalent of what I am calling IObservable. (My proposal boils down to (1) moving this to a top-level type, (2) renaming it and renaming onSubscribe to subscribe, (3) making Observable implement that type, and (4) changing argument lists to accept that type.) Given that, I'm not sure I understand what you mean when you say that such an interface has no value, and is an internal type. My understanding is that absolutely every RxJava user implements OnSubscribeFunc many times, and every time they do, they have to call a wrapper method to use their class. Is that an incorrect impression?

Regarding those wrapper methods (i.e. Observable.create(OnSubscribeFunc) in master), I have counted the calls in the version of master I last merged and in my branch. My branch contains 292 calls; master contains more than twice as many: 664.

Let me turn it around a little, if I may: You clearly recognize the value of a simple one-method interface for your users to implement, because you've given them one: OnSubscribeFunc. The documentation around this interface (mostly attached to Observable.create) requires that it obey the exact same contract as Observable with respect to subscribing. Given that, what is the benefit of keeping those two types in separate type hierarchies, when consolidating them into one requires no change in behavior and has the potential to eliminate a substantial number of wrapper calls? I think we can set aside concerns about returning interfaces and about large interfaces requiring extension methods, since no one is proposing those approaches at this time.

Thanks.

@benjchristensen
Copy link
Member

Here is what the API has become as of 0.17.0 to achieve the 'lift' and Subscriber benefits. More are likely coming to support continuations (async/await style).

   public final static <T> Observable<T> create(OnSubscribe<T> f);

    /**
     * Invoked when Obserable.subscribe is called. 
     */
    public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>> 

    /**
     * Operator function for lifting into an Observable.
     */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> 

    public final Subscription subscribe(Subscriber<? super T> observer);

what is the benefit of keeping those two types in separate type hierarchies

Even if Observable implemented OnSubscribe, someone still needs to wrap OnSubscribe to become an Observable since Java doesn't support extension methods, thus to get the behavior of Observable the inner function (which we call OnSubscribe) has to be put inside the Observable type.

@benjchristensen
Copy link
Member

Closing this so it no longer is on the pull request page, but I'm okay continuing the discussion.

@benjchristensen
Copy link
Member

Regarding those wrapper methods (i.e. Observable.create(OnSubscribeFunc) in master), I have counted the calls in the version of master I last merged and in my branch. My branch contains 292 calls; master contains more than twice as many: 664.

These will almost all go away with the use of lift.

@rickbw
Copy link
Contributor Author

rickbw commented Mar 25, 2014

Thanks for your detailed thoughts in this discussion. I think the 0.17 API has the capabilities it needs to be useful. If I were writing it, I probably wouldn't have decomposed those capabilities the same way: e.g. the current API has even more places where user-supplied code and library-supplied code are blended into a single type; that seems like a code smell to me. However, the distance from one point to the other is now greater than it was when I started this endeavor, so I think my differences of opinion are largely moot at this point. :-) I will continue to use RxJava gladly, and I look forward to contributing in other ways in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants