diff --git a/spec/observables/forkJoin-spec.js b/spec/observables/forkJoin-spec.js index a9b7daed42..a2dcc8eb7d 100644 --- a/spec/observables/forkJoin-spec.js +++ b/spec/observables/forkJoin-spec.js @@ -1,14 +1,155 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, lowerCaseO, hot, expectObservable */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.forkJoin', function () { - it('should join the last values of the provided observables into an array', function (done) { - Observable.forkJoin(Observable.of(1, 2, 3, 'a'), - Observable.of('b'), - Observable.of(1, 2, 3, 4, 'c')) - .subscribe(function (x) { - expect(x).toEqual(['a', 'b', 'c']); - }, null, done); - }); -}); \ No newline at end of file + it('should join the last values of the provided observables into an array', function () { + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|') + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); + + it('should accept lowercase-o observables', function () { + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('(b|)'), + lowerCaseO('1', '2', '3') + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); + + it('should accept promise', function (done) { + var e1 = Observable.forkJoin( + Observable.of(1), + Promise.resolve(2) + ); + + e1.subscribe(function (x) { + expect(x).toEqual([1,2]); + }, + function (err) { + done.fail('should not be called'); + }, + done); + }); + + it('forkJoin n-ary parameters empty', function () { + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('------------------|') + ); + var expected = '------------------|'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin n-ary parameters empty before end', function () { + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('---------|') + ); + var expected = '---------|'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin empty empty', function () { + var e1 = Observable.forkJoin( + hot('--------------|'), + hot('---------|') + ); + var expected = '---------|'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin none', function () { + var e1 = Observable.forkJoin(); + var expected = '|'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin empty return', function () { + function selector(x, y) { + return x + y; + } + + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('---------|'), + selector); + var expected = '---------|'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin return return', function () { + function selector(x, y) { + return x + y; + } + + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('---2-----|'), + selector); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: 'd2'}); + }); + + it('forkJoin empty throw', function () { + var e1 = Observable.forkJoin( + hot('------#'), + hot('---------|')); + var expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin empty throw', function () { + function selector(x, y) { + return x + y; + } + + var e1 = Observable.forkJoin( + hot('------#'), + hot('---------|'), + selector); + var expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin return throw', function () { + var e1 = Observable.forkJoin( + hot('------#'), + hot('---a-----|')); + var expected = '------#'; + + expectObservable(e1).toBe(expected); + }); + + it('forkJoin return throw', function () { + function selector(x, y) { + return x + y; + } + + var e1 = Observable.forkJoin( + hot('------#'), + hot('-------b-|'), + selector); + var expected = '------#'; + + expectObservable(e1).toBe(expected); + }); +}); diff --git a/src/Observable.ts b/src/Observable.ts index de58d9bc71..657b59513b 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -138,7 +138,7 @@ export class Observable implements CoreOperators { static concat: (...observables: Array | Scheduler>) => Observable; static defer: (observableFactory: () => Observable) => Observable; static empty: (scheduler?: Scheduler) => Observable; - static forkJoin: (...observables: Observable[]) => Observable; + static forkJoin: (...sources: Array | Promise | ((...values: Array) => any)>) => Observable; static from: (iterable: any, scheduler?: Scheduler) => Observable; static fromArray: (array: T[], scheduler?: Scheduler) => Observable; static fromEvent: (element: any, eventName: string, selector?: (...args:Array) => T) => Observable; diff --git a/src/observables/ForkJoinObservable.ts b/src/observables/ForkJoinObservable.ts index 0d1b3b6829..00b4a2c824 100644 --- a/src/observables/ForkJoinObservable.ts +++ b/src/observables/ForkJoinObservable.ts @@ -1,50 +1,96 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; +import {PromiseObservable} from './PromiseObservable'; +import {EmptyObservable} from './EmptyObservable'; +import {isPromise} from '../util/isPromise'; export class ForkJoinObservable extends Observable { - constructor(private observables: Observable[]) { - super(); + constructor(private sources: Array | + Promise | + ((...values: Array) => any)>) { + super(); + } + + static create(...sources: Array | + Promise | + ((...values: Array) => any)>) + : Observable { + if (sources === null || sources.length === 0) { + return new EmptyObservable(); + } + return new ForkJoinObservable(sources); } - static create(...observables: Observable[]): Observable { - return new ForkJoinObservable(observables); + private getResultSelector(): (...values: Array) => any { + const sources = this.sources; + + let resultSelector = sources[sources.length - 1]; + if (typeof resultSelector !== 'function') { + return null; + } + this.sources.pop(); + return <(...values: Array) => any>resultSelector; } _subscribe(subscriber: Subscriber) { - const observables = this.observables; - const len = observables.length; - let context = { complete: 0, total: len, values: emptyArray(len) }; + let resultSelector = this.getResultSelector(); + const sources = this.sources; + const len = sources.length; + + const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector }; for (let i = 0; i < len; i++) { - observables[i].subscribe(new AllSubscriber(subscriber, this, i, context)); + let source = sources[i]; + if (isPromise(source)) { + source = new PromiseObservable(>source); + } + (>source).subscribe(new AllSubscriber(subscriber, i, context)); } } } class AllSubscriber extends Subscriber { - private _value: T; + private _value: any = null; - constructor(destination: Subscriber, - private parent: ForkJoinObservable, + constructor(destination: Subscriber, private index: number, - private context: { complete: number, total: number, values: any[] }) { + private context: { completed: number, + total: number, + values: any[], + selector: (...values: Array) => any }) { super(destination); } - _next(value: T) { + _next(value: any): void { this._value = value; } - _complete() { + _complete(): void { + const destination = this.destination; + + if (this._value == null) { + destination.complete(); + } + const context = this.context; + context.completed++; context.values[this.index] = this._value; - if (context.values.every(hasValue)) { - this.destination.next(context.values); - this.destination.complete(); + const values = context.values; + + if (context.completed !== values.length) { + return; + } + + if (values.every(hasValue)) { + let value = context.selector ? context.selector.apply(this, values) : + values; + destination.next(value); } + + destination.complete(); } } -function hasValue(x) { +function hasValue(x: any): boolean { return x !== null; } @@ -54,4 +100,4 @@ function emptyArray(len: number): any[] { arr.push(null); } return arr; -} \ No newline at end of file +} diff --git a/src/observables/PromiseObservable.ts b/src/observables/PromiseObservable.ts index a015eb5796..75042ed230 100644 --- a/src/observables/PromiseObservable.ts +++ b/src/observables/PromiseObservable.ts @@ -13,7 +13,7 @@ export class PromiseObservable extends Observable { return new PromiseObservable(promise, scheduler); } - constructor(private promise: Promise, public scheduler: Scheduler) { + constructor(private promise: Promise, public scheduler: Scheduler = immediate) { super(); } diff --git a/src/operators/debounce.ts b/src/operators/debounce.ts index 2fddc22b1d..6d15c009c0 100644 --- a/src/operators/debounce.ts +++ b/src/operators/debounce.ts @@ -5,6 +5,7 @@ import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {tryCatch} from '../util/tryCatch'; +import {isPromise} from '../util/isPromise'; import {errorObject} from '../util/errorObject'; export function debounce(durationSelector: (value: T) => Observable | Promise): Observable { @@ -41,8 +42,7 @@ class DebounceSubscriber extends Subscriber { if (debounce === errorObject) { destination.error(errorObject.e); } else { - if (typeof debounce.subscribe !== 'function' - && typeof debounce.then === 'function') { + if (isPromise(debounce)) { debounce = PromiseObservable.create(debounce); } @@ -102,4 +102,4 @@ class DurationSelectorSubscriber extends Subscriber { _complete() { this.debounceNext(); } -} \ No newline at end of file +} diff --git a/src/util/isPromise.ts b/src/util/isPromise.ts new file mode 100644 index 0000000000..97d592b649 --- /dev/null +++ b/src/util/isPromise.ts @@ -0,0 +1,3 @@ +export function isPromise(value: any): boolean { + return value && typeof value.subscribe !== 'function' && typeof value.then === 'function'; +} \ No newline at end of file