Skip to content

Commit

Permalink
fix(timeout): Update timeout and timeoutWith to recycle their schedul…
Browse files Browse the repository at this point in the history
…ed timeout actions.

The timeout and timeoutWith operators should dispose their scheduled timeout actions on

unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled

actions so just one action is allocated per subscription.

ReactiveX#2134 ReactiveX#2135
  • Loading branch information
trxcllnt authored and jayphelps committed Nov 20, 2016
1 parent e729d9a commit 1761f59
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 107 deletions.
23 changes: 23 additions & 0 deletions spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => {
expectObservable(result).toBe(expected, values, value);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('--a--b--c---d--e--|');
const e1subs = '^ ! ';
const expected = '--a--b--c-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeout(50, null, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
26 changes: 26 additions & 0 deletions spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('---a---b-----c----|');
const e1subs = '^ ! ';
const e2 = cold( '-x---y| ');
const e2subs = ' ^ ! ';
const expected = '---a---b----x-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeoutWith(40, e2, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
13 changes: 12 additions & 1 deletion spec/schedulers/VirtualTimeScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
v.flush();
expect(count).to.equal(3);
});
});

it('should not execute virtual actions that have been rescheduled before flush', () => {
const v = new VirtualTimeScheduler();
let messages = [];
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
messages.push(state);
}, 10, 'first message');
action = <VirtualAction<string>> action.schedule('second message' , 10);
v.flush();
expect(messages).to.deep.equal(['second message']);
});
});
69 changes: 20 additions & 49 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Action } from '../scheduler/Action';
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';
import { Subscription } from '../Subscription';
import { TimeoutError } from '../util/TimeoutError';

/**
Expand Down Expand Up @@ -45,17 +45,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
private action: Subscription = null;

get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}
private action: Action<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
Expand All @@ -66,56 +57,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (source.previousIndex === currentIndex) {
source.notifyTimeout();
}
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorToSend);
}

private scheduleTimeout(): void {
const currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };

this.cancelTimeout();
this.action = this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState
);
this.add(this.action);

this.index++;
this._previousIndex = currentIndex;
}

private cancelTimeout(): void {
const { action } = this;
if (action !== null) {
this.remove(action);
action.unsubscribe();
this.action = null;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout(): void {
this.error(this.errorToSend);
private _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorToSend = null;
}
}
86 changes: 29 additions & 57 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Action } from '../scheduler/Action';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { Subscription, TeardownLogic } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Observable, ObservableInput } from '../Observable';
import { isDate } from '../util/isDate';
import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -49,81 +50,52 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription = undefined;
private action: Subscription = null;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(public destination: Subscriber<T>,
private action: Action<TimeoutWithSubscriber<T, R>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: ObservableInput<any>,
private scheduler: Scheduler) {
super();
destination.add(this);
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.handleTimeout();
}
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
const { withObservable } = subscriber;
subscriber.unsubscribe();
subscriber.closed = false;
subscriber.isStopped = false;
subscriber.add(subscribeToResult(subscriber, withObservable));
}

private scheduleTimeout(): void {
const currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };

this.cancelTimeout();
this.action = this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState
);
this.add(this.action);

this.index++;
this._previousIndex = currentIndex;
}

private cancelTimeout(): void {
const { action } = this;
if (action !== null) {
this.remove(action);
action.unsubscribe();
this.action = null;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T) {
this.destination.next(value);
protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any) {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
if (!this.closed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
private _unsubscribe() {
this.action = null;
this.scheduler = null;
this.withObservable = null;
}
}

0 comments on commit 1761f59

Please sign in to comment.