From 2edd92c25a0e9d2d9af2c2bd2eba001b560c3f57 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Fri, 20 Nov 2015 11:33:43 -0800 Subject: [PATCH] feat(combineLatest): accept array of observable as parameter closes #594 --- spec/observables/combineLatest-spec.js | 14 +++++++++++++- spec/operators/combineLatest-spec.js | 19 ++++++++++++++++++- src/Observable.ts | 9 +++++++-- src/operators/combineLatest-static.ts | 19 +++++++++++++++---- src/operators/combineLatest.ts | 19 +++++++++++++++---- 5 files changed, 68 insertions(+), 12 deletions(-) diff --git a/spec/observables/combineLatest-spec.js b/spec/observables/combineLatest-spec.js index d541441578..c6a323cb1e 100644 --- a/spec/observables/combineLatest-spec.js +++ b/spec/observables/combineLatest-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect, hot, cold, expectObservable */ +/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; @@ -29,6 +29,18 @@ describe('Observable.combineLatest', function () { }); }); + it('should accept array of observables', function () { + var firstSource = hot('----a----b----c----|'); + var secondSource = hot('--d--e--f--g--|'); + var expected = '----uv--wx-y--z----|'; + + var combined = Observable.combineLatest([firstSource, secondSource], function (a, b) { + return '' + a + b; + }); + + expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'}); + }); + it('should work with two nevers', function () { var e1 = cold( '-'); var e1subs = '^'; diff --git a/spec/operators/combineLatest-spec.js b/spec/operators/combineLatest-spec.js index 6fe65e3a07..6563dfcb02 100644 --- a/spec/operators/combineLatest-spec.js +++ b/spec/operators/combineLatest-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect, hot, cold, expectObservable */ +/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; @@ -145,6 +145,23 @@ describe('Observable.prototype.combineLatest', function () { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should accept array of observables', function () { + var e1 = hot('--a--^--b--c--|'); + var e1subs = '^ !'; + var e2 = hot('---e-^---f--g--|'); + var e2subs = '^ !'; + var e3 = hot('---h-^----i--j-|'); + var e3subs = '^ !'; + var expected = '-----wxyz-|'; + + var result = e1.combineLatest([e2, e3], function (x, y, z) { return x + y + z; }); + + expectObservable(result).toBe(expected, { w: 'bfi', x: 'cfi', y: 'cgi', z: 'cgj' }); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + expectSubscriptions(e3.subscriptions).toBe(e3subs); + }); + it('should work with empty and error', function () { var e1 = hot('----------|'); //empty var e1subs = '^ !'; diff --git a/src/Observable.ts b/src/Observable.ts index 20b3017573..c8c595cec0 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -134,7 +134,10 @@ export class Observable implements CoreOperators { } // static method stubs - static combineLatest: (...observables: Array | ((...values: Array) => T) | Scheduler>) => Observable; + static combineLatest: (...observables: Array | + Array> | + ((...values: Array) => T) | + Scheduler>) => Observable; static concat: (...observables: Array | Scheduler>) => Observable; static defer: (observableFactory: () => Observable) => Observable; static empty: (scheduler?: Scheduler) => Observable; @@ -166,7 +169,9 @@ export class Observable implements CoreOperators { bufferWhen: (closingSelector: () => Observable) => Observable; catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; combineAll: (project?: (...values: Array) => R) => Observable; - combineLatest: (...observables: Array | ((...values: Array) => R)>) => Observable; + combineLatest: (...observables: Array | + Array> | + ((...values: Array) => R)>) => Observable; concat: (...observables: (Observable | Scheduler)[]) => Observable; concatAll: () => Observable; concatMap: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; diff --git a/src/operators/combineLatest-static.ts b/src/operators/combineLatest-static.ts index 90e2defcbc..01ef91799b 100644 --- a/src/operators/combineLatest-static.ts +++ b/src/operators/combineLatest-static.ts @@ -3,6 +3,7 @@ import {ArrayObservable} from '../observables/ArrayObservable'; import {CombineLatestOperator} from './combineLatest-support'; import {Scheduler} from '../Scheduler'; import {isScheduler} from '../util/isScheduler'; +import {isArray} from '../util/isArray'; /** * Combines the values from observables passed as arguments. This is done by subscribing @@ -14,15 +15,25 @@ import {isScheduler} from '../util/isScheduler'; * @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of * the most recent values from each observable. */ -export function combineLatest(...observables: Array | ((...values: Array) => R) | Scheduler>): Observable { - let project, scheduler; +export function combineLatest(...observables: Array | + Array> | + ((...values: Array) => R) | + Scheduler>): Observable { + let project: (...values: Array) => R = null; + let scheduler: Scheduler = null; if (isScheduler(observables[observables.length - 1])) { - scheduler = observables.pop(); + scheduler = observables.pop(); } if (typeof observables[observables.length - 1] === 'function') { - project = observables.pop(); + project = <(...values: Array) => R>observables.pop(); + } + + // if the first and only other argument besides the resultSelector is an array + // assume it's been called with `combineLatest([obs1, obs2, obs3], project)` + if (observables.length === 1 && isArray(observables[0])) { + observables = >>observables[0]; } return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project)); diff --git a/src/operators/combineLatest.ts b/src/operators/combineLatest.ts index 887dc1d318..ec667643a2 100644 --- a/src/operators/combineLatest.ts +++ b/src/operators/combineLatest.ts @@ -1,6 +1,7 @@ import {Observable} from '../Observable'; import {ArrayObservable} from '../observables/ArrayObservable'; import {CombineLatestOperator} from './combineLatest-support'; +import {isArray} from '../util/isArray'; /** * Combines the values from this observable with values from observables passed as arguments. This is done by subscribing @@ -12,11 +13,21 @@ import {CombineLatestOperator} from './combineLatest-support'; * @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of * the most recent values from each observable. */ -export function combineLatest(...observables: Array | ((...values: Array) => R)>): Observable { - observables.unshift(this); - let project; +export function combineLatest(...observables: Array | + Array> | + ((...values: Array) => R)>): Observable { + let project: (...values: Array) => R = null; if (typeof observables[observables.length - 1] === 'function') { - project = observables.pop(); + project = <(...values: Array) => R>observables.pop(); + } + + // if the first and only other argument besides the resultSelector is an array + // assume it's been called with `combineLatest([obs1, obs2, obs3], project)` + if (observables.length === 1 && isArray(observables[0])) { + observables = >>observables[0]; } + + observables.unshift(this); + return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); } \ No newline at end of file