Skip to content

Commit

Permalink
fix(ConnectableObservable): fix ConnectableObservable connectability …
Browse files Browse the repository at this point in the history
…and refCounting

When the ConnectableObservable with refCount always shares the same instance of
the underlying subject (such as in publish, publishReplay, publishBehavior), the
subscription to the connectable observable should NOT incur additional subscriptions
to the underlying cold source. See how tests for
publish/publishBehavior/publishReplay were updated to assert that only one
subscription to the underlying cold source happens, not multiple, because as soon
as the multicasting subject raises an error, this error impedes subsequent
subscriptions to the cold source from happening.

Fix ConnectableObservable, its connect() method, and the RefCountObservable to
support synchronous retry/repeat in the presence of multiple subscribers, and to
support retry/repeat in other asynchronous scenarios.

Resolves bug ReactiveX#678.
  • Loading branch information
staltz committed Nov 25, 2015
1 parent 2f10951 commit eb1e4b6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
9 changes: 2 additions & 7 deletions spec/operators/publish-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publish().refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-#';
Expand All @@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publish().refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-|';
Expand Down
9 changes: 2 additions & 7 deletions spec/operators/publishBehavior-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishBehavior('0').refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '01-2-3----4-#';
Expand All @@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishBehavior('0').refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '01-2-3----4-|';
Expand Down
9 changes: 2 additions & 7 deletions spec/operators/publishReplay-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishReplay(1).refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-(444#)';
Expand All @@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishReplay(1).refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-(44|)';
Expand Down
84 changes: 75 additions & 9 deletions src/observables/ConnectableObservable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';
import {Subscriber} from '../Subscriber';

export class ConnectableObservable<T> extends Observable<T> {

Expand All @@ -24,7 +25,16 @@ export class ConnectableObservable<T> extends Observable<T> {
return (this.subject = this.subjectFactory());
}

connect() {
connect(onSubscribe?: (subscription: Subscription<T>) => void): Subscription<T> {
if (onSubscribe) {
this._callbackConnect(onSubscribe);
return null;
} else {
return this._returningConnect();
}
}

_returningConnect(): Subscription<T> {
const source = this.source;
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
Expand All @@ -35,6 +45,26 @@ export class ConnectableObservable<T> extends Observable<T> {
return (this.subscription = subscription);
}

/**
* Instructs the ConnectableObservable to begin emitting the items from its
* underlying source to its Subscribers.
*
* @param onSubscribe a function that receives the connection subscription
* before the subscription to source happens, allowing the caller to
* synchronously disconnect a synchronous source.
*/
_callbackConnect(onSubscribe: (subscription: Subscription<T>) => void): void {
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
onSubscribe(subscription);
return;
}
this.subscription = subscription = new Subscription();
onSubscribe(subscription);
subscription.add(this.source.subscribe(this._getSubject()));
subscription.add(new ConnectableSubscription(this));
}

refCount(): Observable<T> {
return new RefCountObservable(this);
}
Expand Down Expand Up @@ -62,24 +92,60 @@ class RefCountObservable<T> extends Observable<T> {

_subscribe(subscriber) {
const connectable = this.connectable;
const subscription = connectable.subscribe(subscriber);
if (++this.refCount === 1) {
this.connection = connectable.connect();
const refCountSubscriber = new RefCountSubscriber(subscriber, this);
refCountSubscriber.myConnection = this.connection;
const subscription = connectable.subscribe(refCountSubscriber);

if (!subscription.isUnsubscribed && ++this.refCount === 1) {
connectable.connect(_subscription => {
refCountSubscriber.myConnection = this.connection = _subscription;
});
}
subscription.add(new RefCountSubscription(this));
return subscription;
}
}

class RefCountSubscription<T> extends Subscription<T> {
class RefCountSubscriber<T> extends Subscriber<T> {
myConnection: Subscription<T>;

constructor(private refCountObservable: RefCountObservable<T>) {
super();
constructor(public destination: Subscriber<T>,
private refCountObservable: RefCountObservable<T>) {
super(null);
destination.add(this);
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this._resetConnectable();
this.destination.error(err);
}

_complete() {
this._resetConnectable();
this.destination.complete();
}

_resetConnectable() {
const observable = this.refCountObservable;
const myConnection = this.myConnection;
if (myConnection && myConnection === observable.connection) {
observable.refCount = 0;
observable.connection.unsubscribe();
observable.connection = void 0;
this.unsubscribe();
}
}

_unsubscribe() {
const observable = this.refCountObservable;
if (--observable.refCount === 0) {
if (observable.refCount === 0) {
return;
}
const myConnection = this.myConnection;
if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) {
observable.connection.unsubscribe();
observable.connection = void 0;
}
Expand Down

0 comments on commit eb1e4b6

Please sign in to comment.