Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber linking up for unsubscriptions #830

Closed
akarnokd opened this issue Feb 6, 2014 · 10 comments
Closed

Subscriber linking up for unsubscriptions #830

akarnokd opened this issue Feb 6, 2014 · 10 comments

Comments

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2014

If I understand correctly, a Subscriber should send unsubscriptions only upwards but never downwards. So whenever lift is used one would need to chain up Subscribers like this:

Subscriber<U> call(Subscriber<T> o) {
   Subscriber<U> u = ...
   o.add(u);
   return u;
}

Here, o comes from the client. Calling o.unsubscribe will propagate to u.unsubscribe and up on the chain. However, if u.unsubscribe is called, It won't affect o's subscriptions, i.e., u should send out onCompleted event to affect downstream.

However, the OperatorMap doesn't do this but basically shares the same CompositeSubscription between o and u, therefore, unsubscription will affect both upstream and downstream. OperatorTake completely ignores unsubscription coming from o as well, letting the following example spin trough the 1G values in the background.

Observable.range(0, 1_000_000_000).take(1_000_000_000).take(1).toBlockingObservable().last();
Thread.sleep(5000);
@benjchristensen
Copy link
Member

Yep it was certainly broken. Fixed in #833.

Thanks for the report.

@benjchristensen
Copy link
Member

This unit test proved the issue and is now passing:

    public void testMultiTake() {
        final AtomicInteger count = new AtomicInteger();
        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> s) {
                for (int i = 0; !s.isUnsubscribed(); i++) {
                    System.out.println("Emit: " + i);
                    count.incrementAndGet();
                    s.onNext(i);
                }
            }

        }).take(100).take(1).toBlockingObservable().forEach(new Action1<Integer>() {

            @Override
            public void call(Integer t1) {
                System.out.println("Receive: " + t1);

            }

        });

        assertEquals(1, count.get());
    }

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

What about OperatorMap?

@benjchristensen
Copy link
Member

What about map? It doesn't decouple the subscription chain but just passes it through.

    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

Consider this (running on head as of now):

public static void main(String[] args) throws Exception {
        AtomicInteger count = new AtomicInteger();
        Observable.from(1, 2)
                .map(v -> v * 2).take(1)
                .subscribe(new Subscriber<Integer>() {

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(Integer t) {
                add(Schedulers.newThread().schedule(
                        i -> count.incrementAndGet(), 500, TimeUnit.MILLISECONDS
                ));
            }
        });

        Thread.sleep(1000);

        System.out.println(count);
    }

It prints 0 for me, because once take unsubscribes from map, map (or take?) propagates that unsubscription downwards as well, disrupting the delayed schedule.

@benjchristensen
Copy link
Member

Yes, that's how it behaves, but not because take unsubscribes. It's because onComplete happens which then triggers unsubscribe from the bottom up to do cleanup inside the SafeSubscriber. That is by design.

Rx Design Guideline 4.3. Assume resources are cleaned up after an OnError or OnCompleted
message
Paragraph 4.1 states that no more messages should arrive after an OnError or OnCompleted message. This makes it possible to cleanup any resource used by the subscription the moment an OnError or OnCompleted arrives. Cleaning up resources immediately will make sure that any side-effect occurs in a predictable fashion. It also makes sure that the runtime can reclaim these resources.

Thus, adding the Scheduler Subscription to the Subscriber but wanting it to be run after onCompleted won't work. You would need to remove the add so the scheduling 500ms in the future happens regardless of onComplete happening.

(FYI that I won't be responding to this further tonight. It's past midnight and my brain is done.)

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

Surprisingly, this works:

Observable.from(1).take(1).delay(v -> Observable.timer(500, TimeUnit.MILLISECONDS)).subscribe(s);

even if delay clearly violates the guideline above as take will call unsubscribe before delay even emits its value. It seems delay works only because its CompositeSubscription is disconnected from upstream so SafeSubscriber can't unsubscribe it before the actions were taken.

So if I rewrite delay, pending onNexts won't get run because SafeSubscriber between the two will cancel the schedule/subscriptions. I think SafeSubscriber shouldn't force its actual subscriber (which is downstream) to unsubscribe.

@benjchristensen
Copy link
Member

even if delay clearly violates the guideline above

It doesn't violate the guideline, delay is also delaying the onComplete thus the unsubscribe of SafeSubscriber is not triggered until after delay completes.

Surprisingly, this works:

It's not surprising since delay is an intermediate operator and thus does not have SafeSubscriber applied to it. This is yet another reason why lift is actually a clearer representation of what is happening – the intermediate operations are not subscribing, their functions are lifted into the Observable and executed in sequence when the Observable is subscribed to by the final subscribe(Subscriber).

The only remaining oddity now that we have lift that doesn't cleanly match are nested Observable use cases where an Operator must subscribe to them (such as groupBy, merge, zip, repeat). We still rely on the isInternalImplementation check to prevent SafeSubscriber from wrapping those. That is the intent of this discussion: #676.

During the lift prototyping I had considered a private or different method than subscribe for operators to use but it makes the public API awkward to do so. Likely we will end up with an rx.operators.Subscriber/TrustedSubscriber/OperatorSubscriber marker interface.

I think SafeSubscriber shouldn't force its actual subscriber (which is downstream) to unsubscribe.

If this were to be changed that would mean our current interpretation and implementation of guideline 4.3 is wrong.

As per the current interpretation the "actual subscriber" is not downstream. The SafeSubscriber is the absolute last thing in the sequence and this is because the subscribe step is the final in the chain, it's the exit point.

Observable.from(1).take(1).delay(v -> Observable.timer(500, TimeUnit.MILLISECONDS)).subscribe(s);

The final subscribe(s) there is the end. That's why it returns Subscription (or could equally return Void now). Thus, once the onComplete/onError is invoked it is terminated and the full sequence is unsubscribed.

If SafeSubscriber did not unsubscribe the actual Subscriber, what would it unsubscribe? The intent is to cleanup all resources and the injected Subscriber is one of the resources that is now terminated and being cleaned up.

Considering this interpretation of guideline 4.3, what code changes would you make?

@headinthebox Your input would be valuable to correct any misunderstandings or wrong implementations of the guidelines we have. The particular line of the guideline that influenced this implementation is:

The Rx contract for cleanup ensures that unsubscription will be called automatically once an OnError or OnCompleted message is sent.

Specifically we have interpreted that to mean when onCompleted/onError happens at the very end, not to intermediate steps (since that didn't make sense nor does it work).

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

Thanks. From implementation perspective, I have two concerns:

  • As an internal implementation, the Subscriber returned by my lifter method will be unsubscribed from upstream depending on what other operator is there: the case where basically there is a single CompositeSubscription shared between various lifting Subscribers.
  • As an external implementation, I can't use the Subscriber's add() method because SafeSubscriber will unsubscribe it even if I want different behavior or do something after onCompleted arrived from upstream (i.e., subscribe to another source).

Most likely both situation can be bypassed via nest().

@benjchristensen
Copy link
Member

will be unsubscribed from upstream

If an operator needs to decouple from this (such as groupBy) then it passes a different Subscription up the chain.

The map operator for example doesn't care. It just transforms data it receives.

As an external implementation, I can't use the Subscriber's add() method

Sure you can, within the lifecycle of that Subscriber. If you don't want to work within the lifecycle then you must decouple just like groupBy does. If you want to do something after onComplete you are by definition working outside the lifecycle of that Subscriber. Since you're at the end of the chain you would not have another Subscription to attach to and thus just fire-and-forget as in the example given above with a Scheduler inside onNext.

In short, the subscribe method is not the place to build a chaining operator, implementing the lift function is and then you decouple the Subscription as necessary. If you use subscribe(Subscriber s) then it is the end of the chain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants