Skip to content

Commit

Permalink
refactor(Operators): use isScheduler utility function
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Nov 12, 2015
1 parent 536a6a6 commit 8bca656
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/observables/ArrayObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {ScalarObservable} from './ScalarObservable';
import {EmptyObservable} from './EmptyObservable';
import {isScheduler} from '../util/isScheduler';

export class ArrayObservable<T> extends Observable<T> {

Expand All @@ -11,7 +12,7 @@ export class ArrayObservable<T> extends Observable<T> {

static of<T>(...array: Array<T | Scheduler>): Observable<T> {
let scheduler = <Scheduler>array[array.length - 1];
if (scheduler && typeof scheduler.schedule === 'function') {
if (isScheduler(scheduler)) {
array.pop();
} else {
scheduler = void 0;
Expand Down
3 changes: 2 additions & 1 deletion src/observables/TimerObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {isNumeric} from '../util/isNumeric';
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {nextTick} from '../schedulers/nextTick';
import {isScheduler} from '../util/isScheduler';

export class TimerObservable<T> extends Observable<T> {

Expand Down Expand Up @@ -39,7 +40,7 @@ export class TimerObservable<T> extends Observable<T> {
super();
if (isNumeric(period)) {
this._period = Number(period) < 1 && 1 || Number(period);
} else if (period && typeof (<Scheduler> period).schedule === 'function') {
} else if (isScheduler(period)) {
scheduler = <Scheduler> period;
}
if (!scheduler || typeof scheduler.schedule !== 'function') {
Expand Down
3 changes: 2 additions & 1 deletion src/operators/combineLatest-static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Observable} from '../Observable';
import {ArrayObservable} from '../observables/ArrayObservable';
import {CombineLatestOperator} from './combineLatest-support';
import {Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';

/**
* Combines the values from observables passed as arguments. This is done by subscribing
Expand All @@ -16,7 +17,7 @@ import {Scheduler} from '../Scheduler';
export function combineLatest<R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R) | Scheduler>): Observable<R> {
let project, scheduler;

if (typeof (<any>observables[observables.length - 1]).schedule === 'function') {
if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop();
}

Expand Down
3 changes: 2 additions & 1 deletion src/operators/concat-static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Scheduler} from '../Scheduler';
import {immediate} from '../schedulers/immediate';
import {MergeAllOperator} from './mergeAll-support';
import {ArrayObservable} from '../observables/ArrayObservable';
import {isScheduler} from '../util/isScheduler';

/**
* Joins multiple observables together by subscribing to them one at a time and merging their results
Expand All @@ -14,7 +15,7 @@ import {ArrayObservable} from '../observables/ArrayObservable';
export function concat<R>(...observables: Array<Observable<any> | Scheduler>): Observable<R> {
let scheduler: Scheduler = immediate;
let args = <any[]>observables;
if (typeof (args[observables.length - 1]).schedule === 'function') {
if (isScheduler(args[observables.length - 1])) {
scheduler = args.pop();
}

Expand Down
3 changes: 2 additions & 1 deletion src/operators/concat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {CoreOperators} from '../CoreOperators';
import {isScheduler} from '../util/isScheduler';

/**
* Joins this observable with multiple other observables by subscribing to them one at a time, starting with the source,
Expand All @@ -13,7 +14,7 @@ import {CoreOperators} from '../CoreOperators';
export function concat<R>(...observables: (Observable<any> | Scheduler)[]): Observable<R> {
let args = <any[]>observables;
args.unshift(this);
if (args.length > 1 && typeof args[args.length - 1].schedule === 'function') {
if (args.length > 1 && isScheduler(args[args.length - 1])) {
args.splice(args.length - 2, 0, 1);
}
return (<CoreOperators<any>>Observable.fromArray(args)).mergeAll(1);
Expand Down
3 changes: 2 additions & 1 deletion src/operators/merge-static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import {Observable} from '../Observable';
import {ArrayObservable} from '../observables/ArrayObservable';
import {MergeAllOperator} from './mergeAll-support';
import {immediate} from '../schedulers/immediate';
import {isScheduler} from '../util/isScheduler';

export function merge<R>(...observables: Array<Observable<any> | Scheduler | number>): Observable<R> {
let concurrent = Number.POSITIVE_INFINITY;
let scheduler: Scheduler = immediate;
let last: any = observables[observables.length - 1];
if (typeof last.schedule === 'function') {
if (isScheduler(last)) {
scheduler = <Scheduler>observables.pop();
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
concurrent = <number>observables.pop();
Expand Down
3 changes: 2 additions & 1 deletion src/operators/startWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import {ArrayObservable} from '../observables/ArrayObservable';
import {ScalarObservable} from '../observables/ScalarObservable';
import {EmptyObservable} from '../observables/EmptyObservable';
import {concat} from './concat-static';
import {isScheduler} from '../util/isScheduler';

export function startWith<T>(...array: (T | Scheduler)[]): Observable<T> {
let scheduler = <Scheduler>array[array.length - 1];
if (scheduler && typeof scheduler.schedule === 'function') {
if (isScheduler(scheduler)) {
array.pop();
} else {
scheduler = void 0;
Expand Down

0 comments on commit 8bca656

Please sign in to comment.