-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(subscriber): don't unsubscribe self #4106
Conversation
Thank you for bringing this issue to my attention. Taking a step back, I think that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment, and refer to our offline discussion.
@benlesh This PR is now a WIP, as I've made some changes to make things simpler, but the changes need to be applied to many of the operator-specific subscribers and I've only changed I've removed all of the parent-teardown business and the The Anyway, the important parts are in To me this is simpler than dealing with children, parents and destinations. In the original implementation - that preceded the PR that sought to address the not-unsubscribing-from-source problem - a subscription was added to its destination so that unsubscribing the destination unsubscribes the subscription. That behaviour is now restored. And because the subscription's destination is not added to the subscription, calling If you think this approach is a better way to go, I can apply similar changes to the other subscribers. (It's not just the flattening subscribers. E.g. the subscribers for We would need to decide the best way of adding subscriptions to the destination, as the type assertions are rather ugly: const destination = this.destination as any as Subscription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is precisely what I was suggesting when we discussed this yesterday.
spec/operators/mergeMap-spec.ts
Outdated
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; | ||
|
||
declare const type: Function; | ||
declare const asDiagram: Function; | ||
|
||
/** @test {mergeMap} */ | ||
describe('mergeMap', () => { | ||
describe.only('mergeMap', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is a WIP, but don't forget to remove this only
.
src/internal/operators/mergeMap.ts
Outdated
@@ -140,7 +140,8 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> { | |||
|
|||
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void { | |||
const innerSubscriber = new InnerSubscriber(this, undefined, undefined); | |||
this.add(innerSubscriber); | |||
const destination = this.destination as any as Subscription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
destination is a Subscriber
, which extends Subscription
... I'm not sure why any casting is necessary here... 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it's a PartialObserver
:
protected destination: PartialObserver<any>;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing destination
to be:
protected destination: PartialObserver<any> | Subscriber<any>;
means the type assertion becomes:
const destination = this.destination as Subscription;
src/internal/Subscriber.ts
Outdated
@@ -158,16 +156,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> { | |||
|
|||
/** @deprecated This is an internal implementation detail, do not use. */ | |||
_addParentTeardownLogic(parentTeardownLogic: TeardownLogic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably just remove _addParentTeardownLogic entirely.
- It's prefixed with
_
which is a universal signal for "do not use". - It's been marked as "deprecated do not use" since the moment it was added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that the name has been used in the isTrustedSubscriber
test and that's been released. I wasn't entirely sure of what the affect of it's removal in a new release would be. I suppose it means a trusted subscriber could end up being re-wrapped in a using-6.3.3-with-6.3.2 scenario.
When unsubscribing a subscriber's parent, make sure that the subscriber itself is not unsubscribed. Closes ReactiveX#4095
@benlesh This should be done, now. It's been rebased. The commits made subsequent to your comments are those that follow |
@cartant I think we should schedule a time to walk through this one. |
Remove the mapTo and the concat to reduce the number of subscribers to make the test easier to reason with.
What follows are notes on the history of changes made in this PR. #3963 (which was a refactoring of #2457) made changes to the way in which subscribers were wired up. The changes were made so that teardown logic for sources was called as soon as sources completed or errored. Its changes were:
The major problem with this approach was that a subscriber could be added to itself, as This PR added a test that included this code: const wrapped = new Observable<number>(subscriber => {
const subscription = timer(0, asapScheduler).subscribe(subscriber);
return () => subscription.unsubscribe();
});
wrapped.pipe(
mergeMap(() => timer(0, asapScheduler))
).subscribe({
next(value) { results.push(value); },
complete() { results.push('done'); }
}); When run with the changes in #3963, the sequence of subscribers/subscriptions was:
Note that the This PR reverted the changes in #3963 and made the following changes to a number of operators:
With these changes, the test now passes and the sequence of subscribers/subscriptions is:
Note that the The changes in this PR affected a number of behaviour tests in |
Description:
This PR fixes a problem that was introduced in #3963 and was partially fixed in #4072.
The problem was introduced with this change - to
Subscriber
- in #3963 where a parent subscription is unsubscribed upon acomplete
notification:complete(): void { if (!this.isStopped) { this.isStopped = true; this._complete(); + this._unsubscribeParentSubscription(); } }
The parent subscription is added to the subscriber in
Observable.subscribe
:In numerous situations, the subscription passed to
_addParentTeardownLogic
will be the same subscriber. That is, it will besink
.Adding
sink
to itself, as a parent, introduces a bug, assink
will be unsubscribed from within thecomplete
method - which differs from the pre-#3963 mechanism.What follows below is a description of the PR's original implementation. This implementation was changed substantially as the PR was discussed. For an explanation of the PR's final, merged changes, skip to my comment at the bottom of this conversation.
#4072 introduced a check to ensure that a specific subscriber isn't added to itself as a parent. However, it's still possible for a subscriber that is a child of the specific subscriber to be added as a parent. And doing so will see the specific subscriber's
unsubscribe
called within itscomplete
method - which can effect unwanted behaviour. See the failing test that's included in this PR.This PR solves the problem by adding a
_keepAliveCount
that's incremented before unsubscribing the parent subscription and is decremented afterwards. InSubscription.unsubscribe
, if_keepAliveCount
is greater than zero, the subscription is not unsubscribed.Related issue (if exists): #4095