Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shareReplay): properly retains history on subscribe #2910

Merged
merged 1 commit into from
Oct 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
declare const time: typeof marbleTestingSignature.time;
declare const rxTestScheduler: typeof marbleTestingSignature.rxTestScheduler;

const Observable = Rx.Observable;

Expand Down Expand Up @@ -164,4 +166,17 @@ describe('Observable.prototype.shareReplay', () => {
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not restart if refCount hits 0 due to unsubscriptions', () => {
const results = [];
const source = Rx.Observable.interval(10, rxTestScheduler)
.take(10)
.shareReplay(1);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54);

rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});
});
44 changes: 32 additions & 12 deletions src/operators/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
import { Observable } from '../Observable';
import { multicast } from './multicast';
import { refCount } from './refCount';
import { ReplaySubject } from '../ReplaySubject';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { IScheduler } from '../Scheduler';

import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction } from '../interfaces';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction<T> {
let subject: ReplaySubject<T>;
let refCount = 0;
let subscription: Subscription;
let hasError = false;
let isComplete = true;

const connectable = multicast(function shareReplaySubjectFactory(this: ConnectableObservable<T>) {
if (this._isComplete) {
return subject;
} else {
return (subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler));
return (source: Observable<T>) => new Observable<T>(observer => {
refCount++;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subject.complete();
},
});
}

const innerSub = subject.subscribe(observer);

return () => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && !isComplete) {
subscription.unsubscribe();
}
};
});
return ((source: Observable<T>) => refCount()(connectable(source))) as MonoTypeOperatorFunction<T>;
};
};