Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observer and/or Subscriber? #792

Closed
benjchristensen opened this issue Jan 30, 2014 · 13 comments
Closed

Observer and/or Subscriber? #792

benjchristensen opened this issue Jan 30, 2014 · 13 comments

Comments

@benjchristensen
Copy link
Member

A discussion in issue #746 (comment) related to Observer having become an abstract class led to the idea of keeping Observer as an interface and introducing a new type Subscriber to represent Observer + Subscription.

I have implemented this change on this branch for review: https://github.com/benjchristensen/RxJava/tree/subscriber-observer

Relevant classes here: https://github.com/benjchristensen/RxJava/tree/subscriber-observer/rxjava-core/src/main/java/rx

The signatures currently are:

// Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f)

// Observable.OnSubscribe typed function interface
public static interface OnSubscribe<T> extends Action1<Observer<? super T>>


// Observable.subscribe
public final Subscription subscribe(Observer<? super T> observer)

// lift function
public <R> Observable<R> lift(final Func1<Observer<? super R>, Observer<? super T>> bind)

// Observer
public abstract class Observer<T> implements Subscription {
     public abstract void onNext(T t);
     public abstract void onError(Throwable e);
     public abstract void onCompleted();
     public final void add(Subscription s)
     public final void unsubscribe()
     public final boolean isUnsubscribed()
}

// Subject
public abstract class Subject<T, R> extends Observer<T> {
    public abstract Observable<R> toObservable();
}

This branch changes them to:

// Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f)

// Observable.OnSubscribe typed function interface
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>

// Observable.subscribe
public final Subscription subscribe(Subscriber<? super T> subscriber)
public final Subscription subscribe(Observer<? super T> observer)

// lift function
public <R> Observable<R> lift(final Func1<Subscriber<? super R>, Subscriber<? super T>> bind)

// Observer
public interface Observer<T> {
     public abstract void onNext(T t);
     public abstract void onError(Throwable e);
     public abstract void onCompleted();
}

// Subscriber
public abstract class Subscriber<T> implements Observer<T>, Subscription {
     public final void add(Subscription s)
     public final void unsubscribe()
     public final boolean isUnsubscribed()
}

// Subject (this could probably be changed back to implementing Observer and extending Observable)
public abstract class Subject<T, R> extends Subscriber<T> {
    public abstract Observable<R> toObservable();
}

This allows an Observer to subscribe:

        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> ob) {
                for (int i = 1; i < 100000; i++) {
                    /*
                     * The Observer communicates whether it is unsubscribed
                     * so loops and seqential processing on the same thread
                     * can now unsubscribe.
                     */
                    if (ob.isUnsubscribed()) {
                        System.out.println("--- Unsubscribed at: " + i);
                        return;
                    }
                    ob.onNext(i);
                }
                ob.onCompleted();
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onCompleted() {
                System.out.println("Completed");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Received: " + i);
                if (i == 10) {
                    // a Subscriber can unsubscribe, but an Observer can not
                    //                    unsubscribe();
                }
            }

        });

... but notice that an Observer alone can not unsubscribe.

A Subscriber however can unsubscribe:

package rx;

import rx.Observable.OnSubscribe;

public class Test {

    public static void main(String args[]) {

        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> ob) {
                for (int i = 1; i < 100000; i++) {
                    /*
                     * The Observer communicates whether it is unsubscribed
                     * so loops and seqential processing on the same thread
                     * can now unsubscribe.
                     */
                    if (ob.isUnsubscribed()) {
                        System.out.println("--- Unsubscribed at: " + i);
                        return;
                    }
                    ob.onNext(i);
                }
                ob.onCompleted();
            }

        }).subscribe(new Subscriber<Integer>() {

            @Override
            public void onCompleted() {
                System.out.println("Completed");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Received: " + i);
                if (i == 10) {
                    // a Subscriber can unsubscribe, but an Observer can not
                    unsubscribe();
                }
            }

        });

    }

}

Pros of this change:

  • Observer remains an interface which allows classes to implement it and not run into multiple-inheritance issues
  • Unit tests are "clean" without need for wrapping a mock Observer in TestObserver
  • Most use cases can just use subscribe(Observer) with the interface (but it's just as easy with subscribe(Subscriber) unless you need multiple-inheritance)

Cons of this change:

  • There is a new Subscriber type and users would need to understand the difference between Observer and Subscriber
  • There are both subscribe(Subscriber) and subscribe(Observer) and if using subscribe(Observer) the Observer can not unsubscribe. However, it is quite rare that a user provided Observer actually needs the ability to unsubscribe and if they do they would use take, takeUntil or now be able to use Subscriber.
  • Operator implementations would need to be sure to use Subscriber and not Observer whereas if we only have Observer they have no choice. This should not be a problem for most users of RxJava however.

Please weigh in with opinions or better ideas.

/cc @headinthebox @mttkay @akarnokd @zsxwing @abersnaze @samuelgruetter @jmhofer @mattrjacobs

@akarnokd
Copy link
Member

I like this newer structure better. As for the Subject, I would go for extends Observable implements Observer.

@headinthebox
Copy link
Contributor

Agree with @akarnokd on reverting Subject to the pre-tectonic shift definition.

Generally, I guess this is the design that the gravity of the Java type system is pulling us to.

On good thing I guess is that it minimizes the changes to the external API service,
and only puts the additional constraints on those who want to implement operators.

@mttkay
Copy link
Contributor

mttkay commented Jan 30, 2014

definitely +1 on this one.

I don't think it's too much to ask to familiarize yourself with the
Subscriber type when writing your own operators, and for the more likely
case that you don't write your own, the interfaces remain simple, familiar
and unit tests straight forward and clean.

Good compromise!

@benjchristensen
Copy link
Member Author

Thanks for the feedback. I'll proceed down this path. I prefer the purity of having only Observer but I agree that Subscriber + Observer is the better solution (of those we've considered thus far) for working within the constraints of Java.

@abersnaze
Copy link
Contributor

  1. I would like to propose that we also have a remove(Subscription) on the Subscriber so that Operator implementations don't have to add there own CompositeSubscription instances to get that functionality. Any intermediates separating the Subscribers makes it impossible for the debug hook to build a fulling connected Observerable chain.

  2. Is there some way to alert/prevent internal operators from using the Subscription subscribe(Observer) to direct the migration to void subscribe(Subscriber) as soon as possible. Any use of the Subscription subscribe(Observer) method breaks synchronous Observerable unsubscribe support for the whole chain.

@ronanM
Copy link

ronanM commented Jan 30, 2014

Just a little comment (I don't if it's the good place here) :
For beginers it would be easyer to understand pushNext(...), pushCompleted(), pushError(...), in the "push" model of Observable.

@benjchristensen
Copy link
Member Author

I would like to propose that we also have a remove(Subscription) on the Subscriber so that Operator implementations don't have to add there own CompositeSubscription instances to get that functionality.

Why? What harm is it for Subscriber to compose CompositeSubscription? And why should an Operator ever remove a Subscription?

Is there some way to alert/prevent internal operators

What do you suggest?

@benjchristensen
Copy link
Member Author

For beginners it would be easier to understand pushNext(...), pushCompleted(), pushError(...), in the "push" model of Observable.

It all depends on the perspective. From the receiving side of an Observer it is not a push but a receipt. Thus, from the Observable.create side it may look like pushNext but from the Observable.subscribe side it looks like receiveNext.

The on prefix is a common "listener" or "handler" standard.

The two sides are not separated from each other as it just adds more type complexity, such as:

SendingObserver {
  pushNext(T)
  pushError(Throwable)
  pushCompleted()
}

ReceivingObserver {
  onNext(T)
  onError(Throwable)
  onCompleted()
}

In reality it would just end up like this:

Observer {
  final pushNext(T t) {
   onNext(t)
  }

  final pushError(Throwable e) {
   onError(e);
  }

  final pushCompleted() {
   onCompleted();
  }

  abstract onNext(T);
  abstract onError(Throwable);
  abstract onCompleted();
}

In which case, why not just invoke onNext directly instead of pushNext which just calls onNext?

Also, all implementations of Rx across languages use onNext, onCompleted, onError.

@mttkay
Copy link
Contributor

mttkay commented Jan 30, 2014

I think onNext is more intuitive since the name Observer suggests a
passive role. This is in line with the subject/observer pattern which is
widely known and adopted.

@ronanM
Copy link

ronanM commented Jan 30, 2014

I use RxJava for several months now, so I use onNext(..) & Co without problem. But when I explain Rx to beginers I often wrote :

Observable.create(new OnSubscribeFunction<String>() {
  public Subscription onSubscribe(Observer<String> pusher) {
    ...
    pusher.onNext(...)
    ...
  }
});

@abersnaze
Copy link
Contributor

On Jan 30, 2014, at 12:21 PM, Ben Christensen [email protected] wrote:

I would like to propose that we also have a remove(Subscription) on the Subscriber so that Operator implementations don't have to add there own CompositeSubscription instances to get that functionality.

Why? What harm is it for Subscriber to compose CompositeSubscription? And why should an Operator ever remove a Subscription?

Any intermediates separating the Subscribers makes it impossible for a debug hook to build a fulling connected Observerable chain. Ideally i would like to see CompositeSubscription renamed to Subscription so I could have a hook on add that would have to be used everywhere to allow me to build the Observable chains across all Operators.

Is there some way to alert/prevent internal operators

What do you suggest?

We could base when to do something by looking at the package like we do for wrapping with SafeObserverable. As for what to do it would be easiest to throw an exception. There currently isn't an option for nagging but still being functional.

@benjchristensen
Copy link
Member Author

CompositeSubscription renamed to Subscription

Not all Subscriptions have the signature of a CompositeSubscription. There is a reason for the various implementations.

Any intermediates separating the Subscribers

A bad operator implementation could do this no matter what we do as operators must be capable of decoupling the chain otherwise operators like groupBy can not be implemented.

Ultimately it is required for operators to be implemented correctly, otherwise it's not just the debug hook that will have problems, but subscriptions won't unsubscribe correctly and resources won't be cleaned up.

throw an exception

We can't throw an exception as nothing would work until every single operator was migrated. It would also defeat the point of allowing the deprecated OnSubscribeFunc being used until some future version when it is removed.

@benjchristensen
Copy link
Member Author

I have merged these changes into master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants