From 548ec2a3f3a74c87f40ee74df98f06b38d5c2c2b Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Fri, 15 Jul 2016 18:57:38 -0700 Subject: [PATCH] fix(schedulers): fix asap and animationFrame schedulers to execute across async boundaries. (#1820) The AsapScheduler and AnimationFrameSchedulers were totally busted. My bad. Now they execute their scheduled actions in batches. If actions reschedule while executing a batch, a new frame is requested for the rescheduled action to execute in. This PR also simplifies the public `Scheduler` and `Action` APIs. Implementation details like the `actions` queue and `active` boolean are now on the concrete implementations, so it's easier for people to implement the Scheduler API. This PR also renames `FutureAction` -> `AsyncAction` to conform to the same naming convention as the rest of the Action types. Fixes #1814 --- package.json | 1 + spec/Scheduler-spec.ts | 17 +- .../AnimationFrameScheduler-spec.ts | 88 ++++++++++ spec/schedulers/AsapScheduler-spec.ts | 40 +++++ spec/schedulers/QueueScheduler-spec.ts | 56 ++++++ spec/support/debug.opts | 2 +- src/MiscJSDoc.ts | 89 ---------- src/Rx.ts | 2 + src/Scheduler.ts | 64 ++++++- src/observable/GenerateObservable.ts | 21 ++- src/operator/bufferTime.ts | 8 +- src/operator/do.ts | 1 - src/operator/windowTime.ts | 8 +- src/scheduler/Action.ts | 43 +++-- src/scheduler/AnimationFrameAction.ts | 62 +++---- src/scheduler/AnimationFrameScheduler.ts | 37 +++- src/scheduler/AsapAction.ts | 62 +++---- src/scheduler/AsapScheduler.ts | 37 +++- src/scheduler/AsyncAction.ts | 153 +++++++++++++++++ src/scheduler/AsyncScheduler.ts | 53 +++++- src/scheduler/FutureAction.ts | 147 ---------------- src/scheduler/MiscJSDoc.ts | 85 --------- src/scheduler/QueueAction.ts | 36 +++- src/scheduler/QueueScheduler.ts | 46 +---- src/scheduler/VirtualTimeScheduler.ts | 161 +++++++----------- src/scheduler/animationFrame.ts | 3 +- src/scheduler/asap.ts | 3 +- src/scheduler/async.ts | 3 +- src/scheduler/queue.ts | 3 +- src/testing/HotObservable.ts | 1 - src/testing/SubscriptionLoggable.ts | 1 - src/testing/TestScheduler.ts | 4 +- 32 files changed, 735 insertions(+), 602 deletions(-) create mode 100644 spec/schedulers/AnimationFrameScheduler-spec.ts create mode 100644 spec/schedulers/QueueScheduler-spec.ts create mode 100644 src/scheduler/AsyncAction.ts delete mode 100644 src/scheduler/FutureAction.ts delete mode 100644 src/scheduler/MiscJSDoc.ts diff --git a/package.json b/package.json index 50f640eaba..c37b3bb1ba 100644 --- a/package.json +++ b/package.json @@ -94,6 +94,7 @@ "prepublish": "shx rm -rf ./typings && typings install && npm run build_all", "publish_docs": "./publish_docs.sh", "test_mocha": "mocha --opts spec/support/default.opts spec-js", + "debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js", "test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html", "test": "npm-run-all clean_spec build_spec test_mocha clean_spec", "tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js", diff --git a/spec/Scheduler-spec.ts b/spec/Scheduler-spec.ts index 6f481e1bf2..eca500960e 100644 --- a/spec/Scheduler-spec.ts +++ b/spec/Scheduler-spec.ts @@ -19,6 +19,21 @@ describe('Scheduler.queue', () => { expect(call2).to.be.true; }); + it('should schedule things recursively via this.schedule', () => { + let call1 = false; + let call2 = false; + Scheduler.queue.active = false; + Scheduler.queue.schedule(function (state) { + call1 = state.call1; + call2 = state.call2; + if (!call2) { + this.schedule({ call1: true, call2: true }); + } + }, 0, { call1: true, call2: false }); + expect(call1).to.be.true; + expect(call2).to.be.true; + }); + it('should schedule things in the future too', (done: MochaDone) => { let called = false; Scheduler.queue.schedule(() => { @@ -55,4 +70,4 @@ describe('Scheduler.queue', () => { }); }, 0); }); -}); \ No newline at end of file +}); diff --git a/spec/schedulers/AnimationFrameScheduler-spec.ts b/spec/schedulers/AnimationFrameScheduler-spec.ts new file mode 100644 index 0000000000..1b0b75e9e2 --- /dev/null +++ b/spec/schedulers/AnimationFrameScheduler-spec.ts @@ -0,0 +1,88 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; + +const animationFrame = Rx.Scheduler.animationFrame; + +/** @test {Scheduler} */ +describe('Scheduler.animationFrame', () => { + it('should exist', () => { + expect(animationFrame).exist; + }); + + it('should schedule an action to happen later', (done: MochaDone) => { + let actionHappened = false; + animationFrame.schedule(() => { + actionHappened = true; + done(); + }); + if (actionHappened) { + done(new Error('Scheduled action happened synchronously')); + } + }); + + it('should execute recursively scheduled actions in separate asynchronous contexts', (done: MochaDone) => { + let syncExec1 = true; + let syncExec2 = true; + animationFrame.schedule(function (index) { + if (index === 0) { + this.schedule(1); + animationFrame.schedule(() => { syncExec1 = false; }); + } else if (index === 1) { + this.schedule(2); + animationFrame.schedule(() => { syncExec2 = false; }); + } else if (index === 2) { + this.schedule(3); + } else if (index === 3) { + if (!syncExec1 && !syncExec2) { + done(); + } else { + done(new Error('Execution happened synchronously.')); + } + } + }, 0, 0); + }); + + it('should cancel the animation frame if all scheduled actions unsubscribe before it executes', (done: MochaDone) => { + let animationFrameExec1 = false; + let animationFrameExec2 = false; + const action1 = animationFrame.schedule(() => { animationFrameExec1 = true; }); + const action2 = animationFrame.schedule(() => { animationFrameExec2 = true; }); + expect(animationFrame.scheduled).to.exist; + expect(animationFrame.actions.length).to.equal(2); + action1.unsubscribe(); + action2.unsubscribe(); + expect(animationFrame.actions.length).to.equal(0); + expect(animationFrame.scheduled).to.equal(undefined); + animationFrame.schedule(() => { + expect(animationFrameExec1).to.equal(false); + expect(animationFrameExec2).to.equal(false); + done(); + }); + }); + + it('should execute the rest of the scheduled actions if the first action is canceled', (done: MochaDone) => { + let actionHappened = false; + let firstSubscription = null; + let secondSubscription = null; + + firstSubscription = animationFrame.schedule(() => { + actionHappened = true; + if (secondSubscription) { + secondSubscription.unsubscribe(); + } + done(new Error('The first action should not have executed.')); + }); + + secondSubscription = animationFrame.schedule(() => { + if (!actionHappened) { + done(); + } + }); + + if (actionHappened) { + done(new Error('Scheduled action happened synchronously')); + } else { + firstSubscription.unsubscribe(); + } + }); +}); diff --git a/spec/schedulers/AsapScheduler-spec.ts b/spec/schedulers/AsapScheduler-spec.ts index 45ecd405d9..9cb1e338be 100644 --- a/spec/schedulers/AsapScheduler-spec.ts +++ b/spec/schedulers/AsapScheduler-spec.ts @@ -20,6 +20,46 @@ describe('Scheduler.asap', () => { } }); + it('should execute recursively scheduled actions in separate asynchronous contexts', (done: MochaDone) => { + let syncExec1 = true; + let syncExec2 = true; + asap.schedule(function (index) { + if (index === 0) { + this.schedule(1); + asap.schedule(() => { syncExec1 = false; }); + } else if (index === 1) { + this.schedule(2); + asap.schedule(() => { syncExec2 = false; }); + } else if (index === 2) { + this.schedule(3); + } else if (index === 3) { + if (!syncExec1 && !syncExec2) { + done(); + } else { + done(new Error('Execution happened synchronously.')); + } + } + }, 0, 0); + }); + + it('should cancel the setImmediate if all scheduled actions unsubscribe before it executes', (done: MochaDone) => { + let asapExec1 = false; + let asapExec2 = false; + const action1 = asap.schedule(() => { asapExec1 = true; }); + const action2 = asap.schedule(() => { asapExec2 = true; }); + expect(asap.scheduled).to.exist; + expect(asap.actions.length).to.equal(2); + action1.unsubscribe(); + action2.unsubscribe(); + expect(asap.actions.length).to.equal(0); + expect(asap.scheduled).to.equal(undefined); + asap.schedule(() => { + expect(asapExec1).to.equal(false); + expect(asapExec2).to.equal(false); + done(); + }); + }); + it('should execute the rest of the scheduled actions if the first action is canceled', (done: MochaDone) => { let actionHappened = false; let firstSubscription = null; diff --git a/spec/schedulers/QueueScheduler-spec.ts b/spec/schedulers/QueueScheduler-spec.ts new file mode 100644 index 0000000000..d7e99be869 --- /dev/null +++ b/spec/schedulers/QueueScheduler-spec.ts @@ -0,0 +1,56 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; + +const Scheduler = Rx.Scheduler; +const queue = Scheduler.queue; + +/** @test {Scheduler} */ +describe('Scheduler.queue', () => { + it('should switch from synchronous to asynchronous at will', (done: MochaDone) => { + let lastExecTime = 0; + let asyncExec = false; + queue.schedule(function (index) { + if (index === 0) { + lastExecTime = queue.now(); + this.schedule(1, 100); + } else if (index === 1) { + if (queue.now() - lastExecTime < 100) { + done(new Error('Execution happened synchronously.')); + } else { + asyncExec = true; + lastExecTime = queue.now(); + this.schedule(2, 0); + } + } else if (index === 2) { + if (asyncExec === false) { + done(new Error('Execution happened synchronously.')); + } else { + done(); + } + } + }, 0, 0); + asyncExec = false; + }); + it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => { + const actions = []; + let action2Exec = false; + let action3Exec = false; + let errorValue = undefined; + try { + queue.schedule(() => { + actions.push( + queue.schedule(() => { throw new Error('oops'); }), + queue.schedule(() => { action2Exec = true; }), + queue.schedule(() => { action3Exec = true; }) + ); + }); + } catch (e) { + errorValue = e; + } + expect(actions.every((action) => action.isUnsubscribed)).to.be.true; + expect(action2Exec).to.be.false; + expect(action3Exec).to.be.false; + expect(errorValue).exist; + expect(errorValue.message).to.equal('oops'); + }); +}); diff --git a/spec/support/debug.opts b/spec/support/debug.opts index cd99a064bd..3954bc6507 100644 --- a/spec/support/debug.opts +++ b/spec/support/debug.opts @@ -8,7 +8,7 @@ --bail --full-trace --check-leaks ---globals WebSocket,FormData +--globals WebSocket,FormData,XDomainRequest,ActiveXObject --recursive --timeout 100000 diff --git a/src/MiscJSDoc.ts b/src/MiscJSDoc.ts index 909f6bc819..dce64e6703 100644 --- a/src/MiscJSDoc.ts +++ b/src/MiscJSDoc.ts @@ -8,8 +8,6 @@ import {Subscriber} from './Subscriber'; import {TeardownLogic} from './Subscription'; import {Observable} from './Observable'; -import {Subscription} from './Subscription'; -import {Action} from './scheduler/Action'; import './scheduler/MiscJSDoc'; import './observable/dom/MiscJSDoc'; @@ -130,90 +128,3 @@ export class ObserverDoc { return void 0; } } - -/** - * An execution context and a data structure to order tasks and schedule their - * execution. Provides a notion of (potentially virtual) time, through the - * `now()` getter method. - * - * Each unit of work in a Scheduler is called an {@link Action}. - * - * ```ts - * interface Scheduler { - * now(): number; - * schedule(work, delay?, state?): Subscription; - * flush(): void; - * active: boolean; - * actions: Action[]; - * scheduledId: number; - * } - * ``` - * - * @interface - * @name Scheduler - * @noimport true - */ -export class SchedulerDoc { - /** - * A getter method that returns a number representing the current time - * (at the time this function was called) according to the scheduler's own - * internal clock. - * @return {number} A number that represents the current time. May or may not - * have a relation to wall-clock time. May or may not refer to a time unit - * (e.g. milliseconds). - */ - now(): number { - return 0; - } - - /** - * Schedules a function, `work`, for execution. May happen at some point in - * the future, according to the `delay` parameter, if specified. May be passed - * some context object, `state`, which will be passed to the `work` function. - * - * The given arguments will be processed an stored as an Action object in a - * queue of actions. - * - * @param {function(state: ?T): ?Subscription} work A function representing a - * task, or some unit of work to be executed by the Scheduler. - * @param {number} [delay] Time to wait before executing the work, where the - * time unit is implicit and defined by the Scheduler itself. - * @param {T} [state] Some contextual data that the `work` function uses when - * called by the Scheduler. - * @return {Subscription} A subscription in order to be able to unsubscribe - * the scheduled work. - */ - schedule(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription { - return void 0; - } - - /** - * Prompt the Scheduler to execute all of its queued actions, therefore - * clearing its queue. - * @return {void} - */ - flush(): void { - return void 0; - } - - /** - * A flag to indicate whether the Scheduler is currently executing a batch of - * queued actions. - * @type {boolean} - */ - active: boolean = false; - - /** - * The queue of scheduled actions as an array. - * @type {Action[]} - */ - actions: Action[] = []; - - /** - * An internal ID used to track the latest asynchronous task such as those - * coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and - * others. - * @type {number} - */ - scheduledId: number = 0; -} diff --git a/src/Rx.ts b/src/Rx.ts index 1366783df7..2962a89173 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -184,6 +184,8 @@ import observable from 'symbol-observable'; * asynchronous conversions. * @property {Scheduler} async Schedules work with `setInterval`. Use this for * time-based operations. + * @property {Scheduler} animationFrame Schedules work with `requestAnimationFrame`. + * Use this for synchronizing with the platform's painting */ let Scheduler = { asap, diff --git a/src/Scheduler.ts b/src/Scheduler.ts index e2477f4d0a..29e5dae2b5 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -1,11 +1,59 @@ -import {Subscription} from './Subscription'; import {Action} from './scheduler/Action'; +import {Subscription} from './Subscription'; + +/** + * An execution context and a data structure to order tasks and schedule their + * execution. Provides a notion of (potentially virtual) time, through the + * `now()` getter method. + * + * Each unit of work in a Scheduler is called an {@link Action}. + * + * ```ts + * class Scheduler { + * now(): number; + * schedule(work, delay?, state?): Subscription; + * } + * ``` + * + * @class Scheduler + */ +export class Scheduler { + + public static now: () => number = Date.now ? Date.now : () => +new Date(); + + constructor(private SchedulerAction: typeof Action, + now: () => number = Scheduler.now) { + this.now = now; + } + + /** + * A getter method that returns a number representing the current time + * (at the time this function was called) according to the scheduler's own + * internal clock. + * @return {number} A number that represents the current time. May or may not + * have a relation to wall-clock time. May or may not refer to a time unit + * (e.g. milliseconds). + */ + public now: () => number; -export interface Scheduler { - now(): number; - schedule(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription; - flush(): void; - active: boolean; - actions: Action[]; // XXX: use `any` to remove type param `T` from `Scheduler`. - scheduledId: number; + /** + * Schedules a function, `work`, for execution. May happen at some point in + * the future, according to the `delay` parameter, if specified. May be passed + * some context object, `state`, which will be passed to the `work` function. + * + * The given arguments will be processed an stored as an Action object in a + * queue of actions. + * + * @param {function(state: ?T): ?Subscription} work A function representing a + * task, or some unit of work to be executed by the Scheduler. + * @param {number} [delay] Time to wait before executing the work, where the + * time unit is implicit and defined by the Scheduler itself. + * @param {T} [state] Some contextual data that the `work` function uses when + * called by the Scheduler. + * @return {Subscription} A subscription in order to be able to unsubscribe + * the scheduled work. + */ + public schedule(work: (state?: T) => void, delay: number = 0, state?: T): Subscription { + return new this.SchedulerAction(this, work).schedule(state, delay); + } } diff --git a/src/observable/GenerateObservable.ts b/src/observable/GenerateObservable.ts index 71ff576907..290d428df2 100644 --- a/src/observable/GenerateObservable.ts +++ b/src/observable/GenerateObservable.ts @@ -1,9 +1,8 @@ -import {Observable} from '../Observable' ; import {Scheduler} from '../Scheduler'; +import {Action} from '../scheduler/Action'; +import {Observable} from '../Observable' ; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; -import {Action} from '../scheduler/Action'; - import {isScheduler} from '../util/isScheduler'; const selfSelector = (value: T) => value; @@ -70,16 +69,16 @@ export class GenerateObservable extends Observable { * to send out observer messages. * * - * + * * @example Produces sequence of 0, 1, 2, ... 9, then completes. * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1, x => x); - * + * * @example Using asap scheduler, produces sequence of 2, 3, 5, then completes. * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, x => x + 1, Rx.Scheduler.asap); * * @see {@link from} * @see {@link create} - * + * * @param {S} initialState Initial state. * @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false). * @param {function (state: S): S} iterate Iteration step function. @@ -98,12 +97,12 @@ export class GenerateObservable extends Observable { * producing the sequence's elements, using the specified scheduler * to send out observer messages. * The overload uses state as an emitted value. - * + * * * * @example Produces sequence of 0, 1, 2, ... 9, then completes. * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1); - * + * * @example Using asap scheduler, produces sequence of 1, 2, 4, then completes. * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, Rx.Scheduler.asap); * @@ -127,7 +126,7 @@ export class GenerateObservable extends Observable { * to send out observer messages. * The overload accepts options object that might contain inital state, iterate, * condition and scheduler. - * + * * * * @example Produces sequence of 0, 1, 2, ... 9, then completes. @@ -151,7 +150,7 @@ export class GenerateObservable extends Observable { * to send out observer messages. * The overload accepts options object that might contain inital state, iterate, * condition, result selector and scheduler. - * + * * * * @example Produces sequence of 0, 1, 2, ... 9, then completes. @@ -293,4 +292,4 @@ export class GenerateObservable extends Observable { } return (>>this).schedule(state); } -} \ No newline at end of file +} diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index 990cff8e8d..7186fe2589 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -1,10 +1,10 @@ -import {Operator} from '../Operator'; -import {Subscriber} from '../Subscriber'; -import {Subscription} from '../Subscription'; -import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; +import {Operator} from '../Operator'; import {async} from '../scheduler/async'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; import {isScheduler} from '../util/isScheduler'; /** diff --git a/src/operator/do.ts b/src/operator/do.ts index cacb114a93..ef4778c991 100644 --- a/src/operator/do.ts +++ b/src/operator/do.ts @@ -119,4 +119,3 @@ class DoSubscriber extends Subscriber { } } } - diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index 5b18595e92..a2778c1db8 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -1,11 +1,11 @@ +import {Scheduler} from '../Scheduler'; +import {Action} from '../scheduler/Action'; +import {Subject} from '../Subject'; import {Operator} from '../Operator'; +import {async} from '../scheduler/async'; import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; -import {Subject} from '../Subject'; import {Subscription} from '../Subscription'; -import {Scheduler} from '../Scheduler'; -import {Action} from '../scheduler/Action'; -import {async} from '../scheduler/async'; /** * Branch out the source Observable values as a nested Observable periodically diff --git a/src/scheduler/Action.ts b/src/scheduler/Action.ts index a294e76b87..4d4c70fc4c 100644 --- a/src/scheduler/Action.ts +++ b/src/scheduler/Action.ts @@ -1,12 +1,35 @@ -import {Subscription} from '../Subscription'; import {Scheduler} from '../Scheduler'; +import {Subscription} from '../Subscription'; -export interface Action extends Subscription { - work: (state?: T) => void|Subscription; - state?: T; - delay?: number; - schedule(state?: T, delay?: number): void; - execute(): void; - scheduler: Scheduler; - error: any; -} \ No newline at end of file +/** + * A unit of work to be executed in a {@link Scheduler}. An action is typically + * created from within a Scheduler and an RxJS user does not need to concern + * themselves about creating and manipulating an Action. + * + * ```ts + * class Action extends Subscription { + * new (scheduler: Scheduler, work: (state?: T) => void); + * schedule(state?: T, delay: number = 0): Subscription; + * } + * ``` + * + * @class Action + */ +export class Action extends Subscription { + constructor(scheduler: Scheduler, work: (state?: T) => void) { + super(); + } + /** + * Schedules this action on its parent Scheduler for execution. May be passed + * some context object, `state`. May happen at some point in the future, + * according to the `delay` parameter, if specified. + * @param {T} [state] Some contextual data that the `work` function uses when + * called by the Scheduler. + * @param {number} [delay] Time to wait before executing the work, where the + * time unit is implicit and defined by the Scheduler. + * @return {void} + */ + public schedule(state?: T, delay: number = 0): Subscription { + return this; + } +} diff --git a/src/scheduler/AnimationFrameAction.ts b/src/scheduler/AnimationFrameAction.ts index 321563904b..4924bf44c7 100644 --- a/src/scheduler/AnimationFrameAction.ts +++ b/src/scheduler/AnimationFrameAction.ts @@ -1,44 +1,46 @@ -import {Action} from './Action'; -import {FutureAction} from './FutureAction'; +import {AsyncAction} from './AsyncAction'; import {AnimationFrame} from '../util/AnimationFrame'; +import {AnimationFrameScheduler} from './AnimationFrameScheduler'; /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ -export class AnimationFrameAction extends FutureAction { +export class AnimationFrameAction extends AsyncAction { - protected _schedule(state?: T, delay: number = 0): Action { - if (delay > 0) { - return super._schedule(state, delay); + constructor(protected scheduler: AnimationFrameScheduler, + protected work: (state?: T) => void) { + super(scheduler, work); + } + + protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { + // If delay is greater than 0, request as an async action. + if (delay !== null && delay > 0) { + return super.requestAsyncId(scheduler, id, delay); } - this.delay = delay; - this.state = state; - const {scheduler} = this; + // Push the action to the end of the scheduler queue. scheduler.actions.push(this); - if (!scheduler.scheduledId) { - scheduler.scheduledId = AnimationFrame.requestAnimationFrame(() => { - scheduler.scheduledId = null; - scheduler.flush(); - }); - } - return this; + // If an animation frame has already been requested, don't request another + // one. If an animation frame hasn't been requested yet, request one. Return + // the current animation frame request id. + return scheduler.scheduled || (scheduler.scheduled = AnimationFrame.requestAnimationFrame( + scheduler.flush.bind(scheduler, null) + )); } - - protected _unsubscribe(): void { - - const {scheduler} = this; - const {scheduledId, actions} = scheduler; - - super._unsubscribe(); - - if (actions.length === 0) { - scheduler.active = false; - if (scheduledId != null) { - scheduler.scheduledId = null; - AnimationFrame.cancelAnimationFrame(scheduledId); - } + protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { + // If delay exists and is greater than 0, recycle as an async action. + if (delay !== null && delay > 0) { + return super.recycleAsyncId(scheduler, id, delay); + } + // If the scheduler queue is empty, cancel the requested animation frame and + // set the scheduled flag to undefined so the next AnimationFrameAction will + // request its own. + if (scheduler.actions.length === 0) { + AnimationFrame.cancelAnimationFrame(id); + scheduler.scheduled = undefined; } + // Return undefined so the action knows to request a new async id if it's rescheduled. + return undefined; } } diff --git a/src/scheduler/AnimationFrameScheduler.ts b/src/scheduler/AnimationFrameScheduler.ts index 5ca4a9ef40..ed0c4087bf 100644 --- a/src/scheduler/AnimationFrameScheduler.ts +++ b/src/scheduler/AnimationFrameScheduler.ts @@ -1,10 +1,31 @@ -import {Action} from './Action'; -import {Subscription} from '../Subscription'; -import {QueueScheduler} from './QueueScheduler'; -import {AnimationFrameAction} from './AnimationFrameAction'; - -export class AnimationFrameScheduler extends QueueScheduler { - scheduleNow(work: (x?: T) => Subscription, state?: T): Action { - return new AnimationFrameAction(this, work).schedule(state); +import {AsyncAction} from './AsyncAction'; +import {AsyncScheduler} from './AsyncScheduler'; + +export class AnimationFrameScheduler extends AsyncScheduler { + public flush(): void { + + this.active = true; + this.scheduled = undefined; + + const {actions} = this; + let error: any; + let index: number = -1; + let count: number = actions.length; + let action: AsyncAction = actions.shift(); + + do { + if (error = action.execute(action.state, action.delay)) { + break; + } + } while (++index < count && (action = actions.shift())); + + this.active = false; + + if (error) { + while (++index < count && (action = actions.shift())) { + action.unsubscribe(); + } + throw error; + } } } diff --git a/src/scheduler/AsapAction.ts b/src/scheduler/AsapAction.ts index 4ff8f07ea4..1fcd724480 100644 --- a/src/scheduler/AsapAction.ts +++ b/src/scheduler/AsapAction.ts @@ -1,44 +1,46 @@ -import {Action} from './Action'; import {Immediate} from '../util/Immediate'; -import {FutureAction} from './FutureAction'; +import {AsyncAction} from './AsyncAction'; +import {AsapScheduler} from './AsapScheduler'; /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ -export class AsapAction extends FutureAction { +export class AsapAction extends AsyncAction { - protected _schedule(state?: T, delay: number = 0): Action { - if (delay > 0) { - return super._schedule(state, delay); + constructor(protected scheduler: AsapScheduler, + protected work: (state?: T) => void) { + super(scheduler, work); + } + + protected requestAsyncId(scheduler: AsapScheduler, id?: any, delay: number = 0): any { + // If delay is greater than 0, request as an async action. + if (delay !== null && delay > 0) { + return super.requestAsyncId(scheduler, id, delay); } - this.delay = delay; - this.state = state; - const {scheduler} = this; + // Push the action to the end of the scheduler queue. scheduler.actions.push(this); - if (!scheduler.scheduledId) { - scheduler.scheduledId = Immediate.setImmediate(() => { - scheduler.scheduledId = null; - scheduler.flush(); - }); - } - return this; + // If a microtask has already been scheduled, don't schedule another + // one. If a microtask hasn't been scheduled yet, schedule one now. Return + // the current scheduled microtask id. + return scheduler.scheduled || (scheduler.scheduled = Immediate.setImmediate( + scheduler.flush.bind(scheduler, null) + )); } - - protected _unsubscribe(): void { - - const {scheduler} = this; - const {scheduledId, actions} = scheduler; - - super._unsubscribe(); - - if (actions.length === 0) { - scheduler.active = false; - if (scheduledId != null) { - scheduler.scheduledId = null; - Immediate.clearImmediate(scheduledId); - } + protected recycleAsyncId(scheduler: AsapScheduler, id?: any, delay: number = 0): any { + // If delay exists and is greater than 0, recycle as an async action. + if (delay !== null && delay > 0) { + return super.recycleAsyncId(scheduler, id, delay); + } + // If the scheduler queue is empty, cancel the requested microtask and + // set the scheduled flag to undefined so the next AsapAction will schedule + // its own. + if (scheduler.actions.length === 0) { + Immediate.clearImmediate(id); + scheduler.scheduled = undefined; } + // Return undefined so the action knows to request a new async id if it's rescheduled. + return undefined; } } diff --git a/src/scheduler/AsapScheduler.ts b/src/scheduler/AsapScheduler.ts index b09bf286d4..551912a1cd 100644 --- a/src/scheduler/AsapScheduler.ts +++ b/src/scheduler/AsapScheduler.ts @@ -1,10 +1,31 @@ -import {Action} from './Action'; -import {AsapAction} from './AsapAction'; -import {Subscription} from '../Subscription'; -import {QueueScheduler} from './QueueScheduler'; - -export class AsapScheduler extends QueueScheduler { - scheduleNow(work: (x?: T) => Subscription, state?: T): Action { - return new AsapAction(this, work).schedule(state); +import {AsyncAction} from './AsyncAction'; +import {AsyncScheduler} from './AsyncScheduler'; + +export class AsapScheduler extends AsyncScheduler { + public flush(): void { + + this.active = true; + this.scheduled = undefined; + + const {actions} = this; + let error: any; + let index: number = -1; + let count: number = actions.length; + let action: AsyncAction = actions.shift(); + + do { + if (error = action.execute(action.state, action.delay)) { + break; + } + } while (++index < count && (action = actions.shift())); + + this.active = false; + + if (error) { + while (++index < count && (action = actions.shift())) { + action.unsubscribe(); + } + throw error; + } } } diff --git a/src/scheduler/AsyncAction.ts b/src/scheduler/AsyncAction.ts new file mode 100644 index 0000000000..b95a4e60fb --- /dev/null +++ b/src/scheduler/AsyncAction.ts @@ -0,0 +1,153 @@ +import {root} from '../util/root'; +import {Action} from './Action'; +import {Subscription} from '../Subscription'; +import {AsyncScheduler} from './AsyncScheduler'; + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class AsyncAction extends Action { + + public id: any; + public state: T; + public delay: number; + protected pending: boolean = false; + + constructor(protected scheduler: AsyncScheduler, + protected work: (state?: T) => void) { + super(scheduler, work); + } + + public schedule(state?: T, delay: number = 0): Subscription { + + if (this.isUnsubscribed) { + return this; + } + + // Always replace the current state with the new state. + this.state = state; + + // Set the pending flag indicating that this action has been scheduled, or + // has recursively rescheduled itself. + this.pending = true; + + const id = this.id; + const scheduler = this.scheduler; + + // + // Important implementation note: + // + // Actions only execute once by default, unless rescheduled from within the + // scheduled callback. This allows us to implement single and repeat + // actions via the same code path, without adding API surface area, as well + // as mimic traditional recursion but across asynchronous boundaries. + // + // However, JS runtimes and timers distinguish between intervals achieved by + // serial `setTimeout` calls vs. a single `setInterval` call. An interval of + // serial `setTimeout` calls can be individually delayed, which delays + // scheduling the next `setTimeout`, and so on. `setInterval` attempts to + // guarantee the interval callback will be invoked more precisely to the + // interval period, regardless of load. + // + // Therefore, we use `setInterval` to schedule single and repeat actions. + // If the action reschedules itself with the same delay, the interval is not + // canceled. If the action doesn't reschedule, or reschedules with a + // different delay, the interval will be canceled after scheduled callback + // execution. + // + if (id != null) { + this.id = this.recycleAsyncId(scheduler, id, delay); + } + + this.delay = delay; + // If this action has already an async Id, don't request a new one. + this.id = this.id || this.requestAsyncId(scheduler, this.id, delay); + + return this; + } + + protected requestAsyncId(scheduler: AsyncScheduler, id?: any, delay: number = 0): any { + return root.setInterval(scheduler.flush.bind(scheduler, this), delay); + } + + protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any { + // If this action is rescheduled with the same delay time, don't clear the interval id. + if (delay !== null && this.delay === delay) { + return id; + } + // Otherwise, if the action's delay time is different from the current delay, + // clear the interval id + return root.clearInterval(id) && undefined || undefined; + } + + /** + * Immediately executes this action and the `work` it contains. + * @return {any} + */ + public execute(state: T, delay: number): any { + + if (this.isUnsubscribed) { + return new Error('executing a cancelled action'); + } + + this.pending = false; + const error = this._execute(state, delay); + if (error) { + return error; + } else if (this.pending === false && this.id != null) { + // Dequeue if the action didn't reschedule itself. Don't call + // unsubscribe(), because the action could reschedule later. + // For example: + // ``` + // scheduler.schedule(function doWork(counter) { + // /* ... I'm a busy worker bee ... */ + // var originalAction = this; + // /* wait 100ms before rescheduling the action */ + // setTimeout(function () { + // originalAction.schedule(counter + 1); + // }, 100); + // }, 1000); + // ``` + this.id = this.recycleAsyncId(this.scheduler, this.id, null); + } + } + + protected _execute(state: T, delay: number): any { + let errored: boolean = false; + let errorValue: any = undefined; + try { + this.work(state); + } catch (e) { + errored = true; + errorValue = !!e && e || new Error(e); + } + if (errored) { + this.unsubscribe(); + return errorValue; + } + } + + protected _unsubscribe() { + + const id = this.id; + const scheduler = this.scheduler; + const actions = scheduler.actions; + const index = actions.indexOf(this); + + this.work = null; + this.delay = null; + this.state = null; + this.pending = false; + this.scheduler = null; + + if (index !== -1) { + actions.splice(index, 1); + } + + if (id != null) { + this.id = this.recycleAsyncId(scheduler, id, null); + } + } +} diff --git a/src/scheduler/AsyncScheduler.ts b/src/scheduler/AsyncScheduler.ts index db5af54f55..fc68d358eb 100644 --- a/src/scheduler/AsyncScheduler.ts +++ b/src/scheduler/AsyncScheduler.ts @@ -1,10 +1,47 @@ -import {Action} from './Action'; -import {FutureAction} from './FutureAction'; -import {Subscription} from '../Subscription'; -import {QueueScheduler} from './QueueScheduler'; - -export class AsyncScheduler extends QueueScheduler { - scheduleNow(work: (x?: T) => Subscription, state?: T): Action { - return new FutureAction(this, work).schedule(state, 0); +import {Scheduler} from '../Scheduler'; +import {AsyncAction} from './AsyncAction'; + +export class AsyncScheduler extends Scheduler { + public actions: Array> = []; + /** + * A flag to indicate whether the Scheduler is currently executing a batch of + * queued actions. + * @type {boolean} + */ + public active: boolean = false; + /** + * An internal ID used to track the latest asynchronous task such as those + * coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and + * others. + * @type {any} + */ + public scheduled: any = undefined; + + public flush(action: AsyncAction): void { + + const {actions} = this; + + if (this.active) { + actions.push(action); + return; + } + + let error: any; + this.active = true; + + do { + if (error = action.execute(action.state, action.delay)) { + break; + } + } while (action = actions.shift()); // exhaust the scheduler queue + + this.active = false; + + if (error) { + while (action = actions.shift()) { + action.unsubscribe(); + } + throw error; + } } } diff --git a/src/scheduler/FutureAction.ts b/src/scheduler/FutureAction.ts deleted file mode 100644 index 58cd9c37ac..0000000000 --- a/src/scheduler/FutureAction.ts +++ /dev/null @@ -1,147 +0,0 @@ -import {root} from '../util/root'; -import {Action} from './Action'; -import {Scheduler} from '../Scheduler'; -import {Subscription} from '../Subscription'; - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class FutureAction extends Subscription implements Action { - - public id: number; - public state: T; - public delay: number; - public error: any; - - private pending: boolean = false; - - constructor(public scheduler: Scheduler, - public work: (x?: T) => Subscription | void) { - super(); - } - - execute() { - if (this.isUnsubscribed) { - this.error = new Error('executing a cancelled action'); - } else { - try { - this.work(this.state); - } catch (e) { - this.unsubscribe(); - this.error = e; - } - } - } - - schedule(state?: T, delay: number = 0): Action { - if (this.isUnsubscribed) { - return this; - } - return this._schedule(state, delay); - } - - protected _schedule(state?: T, delay: number = 0): Action { - - // Always replace the current state with the new state. - this.state = state; - - // Set the pending flag indicating that this action has been scheduled, or - // has recursively rescheduled itself. - this.pending = true; - - const id = this.id; - // If this action has an intervalID and the specified delay matches the - // delay we used to create the intervalID, don't call `setInterval` again. - if (id != null && this.delay === delay) { - return this; - } - - this.delay = delay; - - // If this action has an intervalID, but was rescheduled with a different - // `delay` time, cancel the current intervalID and call `setInterval` with - // the new `delay` time. - if (id != null) { - this.id = null; - root.clearInterval(id); - } - - // - // Important implementation note: - // - // By default, FutureAction only executes once. However, Actions have the - // ability to be rescheduled from within the scheduled callback (mimicking - // recursion for asynchronous methods). This allows us to implement single - // and repeated actions with the same code path without adding API surface - // area, and implement tail-call optimization over asynchronous boundaries. - // - // However, JS runtimes make a distinction between intervals scheduled by - // repeatedly calling `setTimeout` vs. a single `setInterval` call, with - // the latter providing a better guarantee of precision. - // - // In order to accommodate both single and repeatedly rescheduled actions, - // use `setInterval` here for both cases. By default, the interval will be - // canceled after its first execution, or if the action schedules itself to - // run again with a different `delay` time. - // - // If the action recursively schedules itself to run again with the same - // `delay` time, the interval is not canceled, but allowed to loop again. - // The check of whether the interval should be canceled or not is run every - // time the interval is executed. The first time an action fails to - // reschedule itself, the interval is canceled. - // - this.id = root.setInterval(() => { - - this.pending = false; - const {id, scheduler} = this; - scheduler.actions.push(this); - scheduler.flush(); - - // - // Terminate this interval if the action didn't reschedule itself. - // Don't call `this.unsubscribe()` here, because the action could be - // rescheduled later. For example: - // - // ``` - // scheduler.schedule(function doWork(counter) { - // /* ... I'm a busy worker bee ... */ - // var originalAction = this; - // /* wait 100ms before rescheduling this action again */ - // setTimeout(function () { - // originalAction.schedule(counter + 1); - // }, 100); - // }, 1000); - // ``` - - if (this.pending === false && id != null) { - this.id = null; - root.clearInterval(id); - } - }, delay); - - return this; - } - - protected _unsubscribe() { - - this.pending = false; - const {id, scheduler} = this; - const {actions} = scheduler; - const index = actions.indexOf(this); - - if (id != null) { - this.id = null; - root.clearInterval(id); - } - - if (index !== -1) { - actions.splice(index, 1); - } - - this.work = null; - this.state = null; - this.scheduler = null; - } -} diff --git a/src/scheduler/MiscJSDoc.ts b/src/scheduler/MiscJSDoc.ts deleted file mode 100644 index 1ca6d9ac7e..0000000000 --- a/src/scheduler/MiscJSDoc.ts +++ /dev/null @@ -1,85 +0,0 @@ -import {Subscription} from '../Subscription'; -import {Scheduler} from '../Scheduler'; - -/** - * A unit of work to be executed in a {@link Scheduler}. An action is typically - * created from within a Scheduler and an RxJS user does not need to concern - * themselves about creating and manipulating an Action. - * - * ```ts - * interface Action extends Subscription { - * work: (state?: any) => void|Subscription; - * state?: any; - * delay?: number; - * schedule(state?: any, delay?: number): void; - * execute(): void; - * scheduler: Scheduler; - * error: any; - * } - * ``` - * - * @interface - * @name Action - * @noimport true - */ -export class ActionDoc extends Subscription { - /** - * The function that represents the raw work to be executed on a Scheduler. - * @param {any} [state] Some contextual data that the `work` function uses - * when called by the Scheduler. - * @return {?Subscription} A subscription in order to be able to unsubscribe - * the scheduled work. - */ - work(state?: any): void|Subscription { - return void 0; - } - - /** - * The current state. This is the object that will be given to the `work` - * method. - * @type {any} - */ - state: any = void 0; - - /** - * Represents the time (relative to the Scheduler's own clock) when this - * action should be executed. - * @type {number} - */ - delay: number = 0; - - /** - * Schedules this action on its parent Scheduler for execution. May be passed - * some context object, `state`. May happen at some point in the future, - * according to the `delay` parameter, if specified. - * @param {any} [state] Some contextual data that the `work` function uses when - * called by the Scheduler. - * @param {number} [delay] Time to wait before executing the work, where the - * time unit is implicit and defined by the Scheduler. - * @return {void} - */ - schedule(state?: any, delay?: number): void { - return void 0; - } - - /** - * Immediately executes this action and the `work` it contains. - * @return {any} - */ - execute(): void { - return void 0; - } - - /** - * The Scheduler which owns this action. - * @type {Scheduler} - */ - scheduler: Scheduler = null; - - /** - * A reference to the latest error that may have occurred during action - * execution. - * @type {any} - */ - error: any; -} diff --git a/src/scheduler/QueueAction.ts b/src/scheduler/QueueAction.ts index 5d29e69e5f..2c806f0c58 100644 --- a/src/scheduler/QueueAction.ts +++ b/src/scheduler/QueueAction.ts @@ -1,21 +1,41 @@ -import {Action} from './Action'; -import {FutureAction} from './FutureAction'; +import {AsyncAction} from './AsyncAction'; +import {Subscription} from '../Subscription'; +import {QueueScheduler} from './QueueScheduler'; /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ -export class QueueAction extends FutureAction { - protected _schedule(state?: T, delay: number = 0): Action { +export class QueueAction extends AsyncAction { + + constructor(protected scheduler: QueueScheduler, + protected work: (state?: T) => void) { + super(scheduler, work); + } + + public schedule(state?: T, delay: number = 0): Subscription { if (delay > 0) { - return super._schedule(state, delay); + return super.schedule(state, delay); } this.delay = delay; this.state = state; - const scheduler = this.scheduler; - scheduler.actions.push(this); - scheduler.flush(); + this.scheduler.flush(this); return this; } + + public execute(state: T, delay: number): any { + return (delay > 0 || this.isUnsubscribed) ? + super.execute(state, delay) : + this._execute(state, delay) ; + } + + protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any { + // If delay is greater than 0, enqueue as an async action. + if (delay !== null && delay > 0) { + return super.requestAsyncId(scheduler, id, delay); + } + // Otherwise flush the scheduler starting with this action. + return scheduler.flush(this); + } } diff --git a/src/scheduler/QueueScheduler.ts b/src/scheduler/QueueScheduler.ts index 84e4237159..e2974d9740 100644 --- a/src/scheduler/QueueScheduler.ts +++ b/src/scheduler/QueueScheduler.ts @@ -1,46 +1,4 @@ -import {Scheduler} from '../Scheduler'; -import {QueueAction} from './QueueAction'; -import {Subscription} from '../Subscription'; -import {FutureAction} from './FutureAction'; -import {Action} from './Action'; +import {AsyncScheduler} from './AsyncScheduler'; -export class QueueScheduler implements Scheduler { - public active: boolean = false; - public actions: QueueAction[] = []; // XXX: use `any` to remove type param `T` from `VirtualTimeScheduler`. - public scheduledId: number = null; - - now() { - return Date.now(); - } - - flush() { - if (this.active || this.scheduledId) { - return; - } - this.active = true; - const actions = this.actions; - // XXX: use `any` to remove type param `T` from `VirtualTimeScheduler`. - for (let action: QueueAction = null; action = actions.shift(); ) { - action.execute(); - if (action.error) { - this.active = false; - throw action.error; - } - } - this.active = false; - } - - schedule(work: (x?: T) => Subscription | void, delay: number = 0, state?: T): Subscription { - return (delay <= 0) ? - this.scheduleNow(work, state) : - this.scheduleLater(work, delay, state); - } - - scheduleNow(work: (x?: T) => Subscription | void, state?: T): Action { - return new QueueAction(this, work).schedule(state); - } - - scheduleLater(work: (x?: T) => Subscription | void, delay: number, state?: T): Action { - return new FutureAction(this, work).schedule(state, delay); - } +export class QueueScheduler extends AsyncScheduler { } diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 1b0e7a6775..76de9f641b 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -1,68 +1,41 @@ -import {Scheduler} from '../Scheduler'; +import {AsyncAction} from './AsyncAction'; import {Subscription} from '../Subscription'; -import {Action} from './Action'; +import {AsyncScheduler} from './AsyncScheduler'; -export class VirtualTimeScheduler implements Scheduler { - actions: Action[] = []; // XXX: use `any` to remove type param `T` from `VirtualTimeScheduler`. - active: boolean = false; - scheduledId: number = null; - index: number = 0; - sorted: boolean = false; - frame: number = 0; - maxFrames: number = 750; +export class VirtualTimeScheduler extends AsyncScheduler { protected static frameTimeFactor: number = 10; - now() { - return this.frame; - } + public frame: number = 0; + public index: number = -1; - flush() { - const actions = this.actions; - const maxFrames = this.maxFrames; - while (actions.length > 0) { - let action = actions.shift(); - this.frame = action.delay; - if (this.frame <= maxFrames) { - action.execute(); - if (action.error) { - actions.length = 0; - this.frame = 0; - throw action.error; - } - } else { - break; - } - } - actions.length = 0; - this.frame = 0; + constructor(SchedulerAction: typeof AsyncAction = VirtualAction, + public maxFrames: number = 750) { + super(SchedulerAction, () => this.frame); } - addAction(action: Action): void { - const actions: Action[] = this.actions; + /** + * Prompt the Scheduler to execute all of its queued actions, therefore + * clearing its queue. + * @return {void} + */ + public flush(): void { - actions.push(action); + const {actions, maxFrames} = this; + let error: any, action: AsyncAction; - actions.sort((a: VirtualAction, b: VirtualAction) => { - if (a.delay === b.delay) { - if (a.index === b.index) { - return 0; - } else if (a.index > b.index) { - return 1; - } else { - return -1; - } - } else if (a.delay > b.delay) { - return 1; - } else { - return -1; + while ((action = actions.shift()) && (this.frame = action.delay) <= maxFrames) { + if (error = action.execute(action.state, action.delay)) { + break; } - }); - } + } - schedule(work: (x?: T) => Subscription | void, delay: number = 0, state?: T): Subscription { - this.sorted = false; - return new VirtualAction(this, work, this.index++).schedule(state, delay); + if (error) { + while (action = actions.shift()) { + action.unsubscribe(); + } + throw error; + } } } @@ -71,58 +44,52 @@ export class VirtualTimeScheduler implements Scheduler { * @ignore * @extends {Ignored} */ -class VirtualAction extends Subscription implements Action { - state: T; - delay: number; - calls = 0; - error: any; +export class VirtualAction extends AsyncAction { - constructor(public scheduler: VirtualTimeScheduler, - public work: (x?: T) => Subscription | void, - public index: number) { - super(); + constructor(protected scheduler: VirtualTimeScheduler, + protected work: (state?: T) => void, + protected index: number = scheduler.index += 1) { + super(scheduler, work); + this.index = scheduler.index = index; } - schedule(state?: T, delay: number = 0): VirtualAction { - if (this.isUnsubscribed) { - return this; - } - const scheduler = this.scheduler; - let action: Action = null; - if (this.calls++ === 0) { - // the action is not being rescheduled. - action = this; - } else { - // the action is being rescheduled, and we can't mutate the one in the actions list - // in the scheduler, so we'll create a new one. - action = new VirtualAction(scheduler, this.work, scheduler.index += 1); - this.add(action); - } - action.state = state; - action.delay = scheduler.frame + delay; - scheduler.addAction(action); - return this; + public schedule(state?: T, delay: number = 0): Subscription { + return !this.id ? + super.schedule(state, delay) : ( + // If an action is rescheduled, we save allocations by mutating its state, + // pushing it to the end of the scheduler queue, and recycling the action. + // But since the VirtualTimeScheduler is used for testing, VirtualActions + // must be immutable so they can be inspected later. + > this.add( + new VirtualAction(this.scheduler, this.work)) + ).schedule(state, delay); } - execute() { - if (this.isUnsubscribed) { - throw new Error('How did did we execute a canceled Action?'); - } - this.work(this.state); + protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any { + this.delay = scheduler.frame + delay; + const {actions} = scheduler; + actions.push(this); + actions.sort(VirtualAction.sortActions); + return true; } - unsubscribe() { - const actions = this.scheduler.actions; - const index = actions.indexOf(this); - - this.work = void 0; - this.state = void 0; - this.scheduler = void 0; + protected recycleAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any { + return undefined; + } - if (index !== -1) { - actions.splice(index, 1); + public static sortActions(a: VirtualAction, b: VirtualAction) { + if (a.delay === b.delay) { + if (a.index === b.index) { + return 0; + } else if (a.index > b.index) { + return 1; + } else { + return -1; + } + } else if (a.delay > b.delay) { + return 1; + } else { + return -1; } - - super.unsubscribe(); } } diff --git a/src/scheduler/animationFrame.ts b/src/scheduler/animationFrame.ts index 87366abac2..0861861bb7 100644 --- a/src/scheduler/animationFrame.ts +++ b/src/scheduler/animationFrame.ts @@ -1,3 +1,4 @@ +import {AnimationFrameAction} from './AnimationFrameAction'; import {AnimationFrameScheduler} from './AnimationFrameScheduler'; -export const animationFrame = new AnimationFrameScheduler(); +export const animationFrame = new AnimationFrameScheduler(AnimationFrameAction); diff --git a/src/scheduler/asap.ts b/src/scheduler/asap.ts index 8d829b5f70..5c69789f2c 100644 --- a/src/scheduler/asap.ts +++ b/src/scheduler/asap.ts @@ -1,3 +1,4 @@ +import {AsapAction} from './AsapAction'; import {AsapScheduler} from './AsapScheduler'; -export const asap = new AsapScheduler(); \ No newline at end of file +export const asap = new AsapScheduler(AsapAction); diff --git a/src/scheduler/async.ts b/src/scheduler/async.ts index fce4d75f68..e0a2219d58 100644 --- a/src/scheduler/async.ts +++ b/src/scheduler/async.ts @@ -1,3 +1,4 @@ +import {AsyncAction} from './AsyncAction'; import {AsyncScheduler} from './AsyncScheduler'; -export const async = new AsyncScheduler(); +export const async = new AsyncScheduler(AsyncAction); diff --git a/src/scheduler/queue.ts b/src/scheduler/queue.ts index 8490d40b51..e2aeb741fb 100644 --- a/src/scheduler/queue.ts +++ b/src/scheduler/queue.ts @@ -1,3 +1,4 @@ +import {QueueAction} from './QueueAction'; import {QueueScheduler} from './QueueScheduler'; -export const queue = new QueueScheduler(); \ No newline at end of file +export const queue = new QueueScheduler(QueueAction); diff --git a/src/testing/HotObservable.ts b/src/testing/HotObservable.ts index defd518f57..25b5edc49a 100644 --- a/src/testing/HotObservable.ts +++ b/src/testing/HotObservable.ts @@ -50,4 +50,3 @@ export class HotObservable extends Subject implements SubscriptionLoggable } } applyMixins(HotObservable, [SubscriptionLoggable]); - diff --git a/src/testing/SubscriptionLoggable.ts b/src/testing/SubscriptionLoggable.ts index a16567479c..84be06a18a 100644 --- a/src/testing/SubscriptionLoggable.ts +++ b/src/testing/SubscriptionLoggable.ts @@ -19,4 +19,3 @@ export class SubscriptionLoggable { ); } } - diff --git a/src/testing/TestScheduler.ts b/src/testing/TestScheduler.ts index fef3892401..dd1b57737d 100644 --- a/src/testing/TestScheduler.ts +++ b/src/testing/TestScheduler.ts @@ -1,5 +1,4 @@ import {Observable} from '../Observable'; -import {VirtualTimeScheduler} from '../scheduler/VirtualTimeScheduler'; import {Notification} from '../Notification'; import {Subject} from '../Subject'; import {ColdObservable} from './ColdObservable'; @@ -7,6 +6,7 @@ import {HotObservable} from './HotObservable'; import {TestMessage} from './TestMessage'; import {SubscriptionLog} from './SubscriptionLog'; import {Subscription} from '../Subscription'; +import {VirtualTimeScheduler} from '../scheduler/VirtualTimeScheduler'; interface FlushableTest { ready: boolean; @@ -240,4 +240,4 @@ export class TestScheduler extends VirtualTimeScheduler { } return testMessages; } -} \ No newline at end of file +}