Skip to content

Commit

Permalink
fix(ConnectableObservable): fix ConnectableObservable connectability …
Browse files Browse the repository at this point in the history
…and refCounting

Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous
retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other
asynchronous scenarios. This commit is a major rework of ConnectableObservable.

Resolves bug ReactiveX#678.
  • Loading branch information
staltz committed Nov 19, 2015
1 parent 73aaaa9 commit 8e890f7
Showing 1 changed file with 73 additions and 8 deletions.
81 changes: 73 additions & 8 deletions src/observables/ConnectableObservable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';
import {Subscriber} from '../Subscriber';

export class ConnectableObservable<T> extends Observable<T> {

Expand All @@ -24,7 +25,16 @@ export class ConnectableObservable<T> extends Observable<T> {
return (this.subject = this.subjectFactory());
}

connect() {
connect(onSubscribe?: (subscription: Subscription<T>) => void): Subscription<T> {
if (onSubscribe) {
this._callbackConnect(onSubscribe);
return null;
} else {
return this._returningConnect();
}
}

_returningConnect(): Subscription<T> {
const source = this.source;
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
Expand All @@ -35,6 +45,26 @@ export class ConnectableObservable<T> extends Observable<T> {
return (this.subscription = subscription);
}

/**
* Instructs the ConnectableObservable to begin emitting the items from its
* underlying source to its Subscribers.
*
* @param onSubscribe a function that receives the connection subscription
* before the subscription to source happens, allowing the caller to
* synchronously disconnect a synchronous source.
*/
_callbackConnect(onSubscribe: (subscription: Subscription<T>) => void): void {
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
onSubscribe(subscription);
return;
}
this.subscription = subscription = new Subscription();
onSubscribe(subscription);
subscription.add(this.source.subscribe(this._getSubject()));
subscription.add(new ConnectableSubscription(this));
}

refCount(): Observable<T> {
return new RefCountObservable(this);
}
Expand Down Expand Up @@ -62,24 +92,59 @@ class RefCountObservable<T> extends Observable<T> {

_subscribe(subscriber) {
const connectable = this.connectable;
const subscription = connectable.subscribe(subscriber);
const refCountSubscriber = new RefCountSubscriber(subscriber, this);
refCountSubscriber.myConnection = this.connection;
const subscription = connectable.subscribe(refCountSubscriber);

if (++this.refCount === 1) {
this.connection = connectable.connect();
connectable.connect(subscription => {
this.connection = subscription;
refCountSubscriber.myConnection = subscription;
});
}
subscription.add(new RefCountSubscription(this));
return subscription;
}
}

class RefCountSubscription<T> extends Subscription<T> {
class RefCountSubscriber<T> extends Subscriber<T> {
myConnection: Subscription<T>;

constructor(private refCountObservable: RefCountObservable<T>) {
super();
constructor(public destination: Subscriber<T>,
private refCountObservable: RefCountObservable<T>) {
super(null);
destination.add(this);
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this._resetConnectable();
this.destination.error(err);
}

_complete() {
this._resetConnectable();
this.destination.complete();
}

_resetConnectable() {
const observable = this.refCountObservable;
if (this.myConnection === observable.connection) {
observable.refCount = 0;
observable.connection.unsubscribe();
observable.connection = void 0;
this.unsubscribe();
}
}

_unsubscribe() {
const observable = this.refCountObservable;
if (--observable.refCount === 0) {
if (observable.refCount === 0) {
return;
}
if (--observable.refCount === 0 && this.myConnection === observable.connection) {
observable.connection.unsubscribe();
observable.connection = void 0;
}
Expand Down

0 comments on commit 8e890f7

Please sign in to comment.