diff --git a/src/observables/ConnectableObservable.ts b/src/observables/ConnectableObservable.ts index 6f7a9546a2e..683e2530124 100644 --- a/src/observables/ConnectableObservable.ts +++ b/src/observables/ConnectableObservable.ts @@ -1,6 +1,7 @@ import {Subject} from '../Subject'; import {Observable} from '../Observable'; import {Subscription} from '../Subscription'; +import {Subscriber} from '../Subscriber'; export class ConnectableObservable extends Observable { @@ -24,7 +25,16 @@ export class ConnectableObservable extends Observable { return (this.subject = this.subjectFactory()); } - connect() { + connect(onSubscribe?: (subscription: Subscription) => void): Subscription { + if (onSubscribe) { + this._callbackConnect(onSubscribe); + return null; + } else { + return this._returningConnect(); + } + } + + _returningConnect(): Subscription { const source = this.source; let subscription = this.subscription; if (subscription && !subscription.isUnsubscribed) { @@ -35,6 +45,26 @@ export class ConnectableObservable extends Observable { return (this.subscription = subscription); } + /** + * Instructs the ConnectableObservable to begin emitting the items from its + * underlying source to its Subscribers. + * + * @param onSubscribe a function that receives the connection subscription + * before the subscription to source happens, allowing the caller to + * synchronously disconnect a synchronous source. + */ + _callbackConnect(onSubscribe: (subscription: Subscription) => void): void { + let subscription = this.subscription; + if (subscription && !subscription.isUnsubscribed) { + onSubscribe(subscription); + return; + } + this.subscription = subscription = new Subscription(); + onSubscribe(subscription); + subscription.add(this.source.subscribe(this._getSubject())); + subscription.add(new ConnectableSubscription(this)); + } + refCount(): Observable { return new RefCountObservable(this); } @@ -62,24 +92,59 @@ class RefCountObservable extends Observable { _subscribe(subscriber) { const connectable = this.connectable; - const subscription = connectable.subscribe(subscriber); + const refCountSubscriber = new RefCountSubscriber(subscriber, this); + refCountSubscriber.myConnection = this.connection; + const subscription = connectable.subscribe(refCountSubscriber); + if (++this.refCount === 1) { - this.connection = connectable.connect(); + connectable.connect(subscription => { + this.connection = subscription; + refCountSubscriber.myConnection = subscription; + }); } - subscription.add(new RefCountSubscription(this)); return subscription; } } -class RefCountSubscription extends Subscription { +class RefCountSubscriber extends Subscriber { + myConnection: Subscription; - constructor(private refCountObservable: RefCountObservable) { - super(); + constructor(public destination: Subscriber, + private refCountObservable: RefCountObservable) { + super(null); + destination.add(this); + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this._resetConnectable(); + this.destination.error(err); + } + + _complete() { + this._resetConnectable(); + this.destination.complete(); + } + + _resetConnectable() { + const observable = this.refCountObservable; + if (this.myConnection === observable.connection) { + observable.refCount = 0; + observable.connection.unsubscribe(); + observable.connection = void 0; + this.unsubscribe(); + } } _unsubscribe() { const observable = this.refCountObservable; - if (--observable.refCount === 0) { + if (observable.refCount === 0) { + return; + } + if (--observable.refCount === 0 && this.myConnection === observable.connection) { observable.connection.unsubscribe(); observable.connection = void 0; }