Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(merge): add higher-order lettable version of merge #2809

Merged
merged 1 commit into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 4 additions & 100 deletions src/operator/merge.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { ArrayObservable } from '../observable/ArrayObservable';
import { MergeAllOperator } from './mergeAll';
import { isScheduler } from '../util/isScheduler';
import { merge as higherOrder } from '../operators';

export { mergeStatic } from '../operators/merge';

/* tslint:disable:max-line-length */
export function merge<T>(this: Observable<T>, scheduler?: IScheduler): Observable<T>;
Expand Down Expand Up @@ -68,101 +68,5 @@ export function merge<T, R>(this: Observable<T>, ...observables: Array<Observabl
* @owner Observable
*/
export function merge<T, R>(this: Observable<T>, ...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> {
return this.lift.call(mergeStatic<T, R>(this, ...observables));
}

/* tslint:disable:max-line-length */
export function mergeStatic<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T>(v1: ObservableInput<T>, concurrent?: number, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T>(...observables: (ObservableInput<T> | IScheduler | number)[]): Observable<T>;
export function mergeStatic<T, R>(...observables: (ObservableInput<any> | IScheduler | number)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which concurrently emits all values from every
* given input Observable.
*
* <span class="informal">Flattens multiple Observables together by blending
* their values into one Observable.</span>
*
* <img src="./img/merge.png" width="100%">
*
* `merge` subscribes to each given input Observable (as arguments), and simply
* forwards (without doing any transformation) all the values from all the input
* Observables to the output Observable. The output Observable only completes
* once all input Observables have completed. Any error delivered by an input
* Observable will be immediately emitted on the output Observable.
*
* @example <caption>Merge together two Observables: 1s interval and clicks</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var timer = Rx.Observable.interval(1000);
* var clicksOrTimer = Rx.Observable.merge(clicks, timer);
* clicksOrTimer.subscribe(x => console.log(x));
*
* // Results in the following:
* // timer will emit ascending values, one every second(1000ms) to console
* // clicks logs MouseEvents to console everytime the "document" is clicked
* // Since the two streams are merged you see these happening
* // as they occur.
*
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var concurrent = 2; // the argument
* var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
* merged.subscribe(x => console.log(x));
*
* // Results in the following:
* // - First timer1 and timer2 will run concurrently
* // - timer1 will emit a value every 1000ms for 10 iterations
* // - timer2 will emit a value every 2000ms for 6 iterations
* // - after timer1 hits it's max iteration, timer2 will
* // continue, and timer3 will start to run concurrently with timer2
* // - when timer2 hits it's max iteration it terminates, and
* // timer3 will continue to emit a value every 500ms until it is complete
*
* @see {@link mergeAll}
* @see {@link mergeMap}
* @see {@link mergeMapTo}
* @see {@link mergeScan}
*
* @param {...ObservableInput} observables Input Observables to merge together.
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
* Observables being subscribed to concurrently.
* @param {Scheduler} [scheduler=null] The IScheduler to use for managing
* concurrency of input Observables.
* @return {Observable} an Observable that emits items that are the result of
* every input Observable.
* @static true
* @name merge
* @owner Observable
*/
export function mergeStatic<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> {
let concurrent = Number.POSITIVE_INFINITY;
let scheduler: IScheduler = null;
let last: any = observables[observables.length - 1];
if (isScheduler(last)) {
scheduler = <IScheduler>observables.pop();
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
concurrent = <number>observables.pop();
}
} else if (typeof last === 'number') {
concurrent = <number>observables.pop();
}

if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
return <Observable<R>>observables[0];
}

return new ArrayObservable(<any>observables, scheduler).lift(new MergeAllOperator<R>(concurrent));
return higherOrder(...observables)(this);
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export { ignoreElements } from './ignoreElements';
export { map } from './map';
export { materialize } from './materialize';
export { max } from './max';
export { merge } from './merge';
export { mergeAll } from './mergeAll';
export { mergeMap } from './mergeMap';
export { min } from './min';
Expand Down
123 changes: 123 additions & 0 deletions src/operators/merge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { ArrayObservable } from '../observable/ArrayObservable';
import { mergeAll } from './mergeAll';
import { isScheduler } from '../util/isScheduler';
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces';

/* tslint:disable:max-line-length */
export function merge<T>(scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
export function merge<T>(concurrent?: number, scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
export function merge<T, T2>(v2: ObservableInput<T2>, scheduler?: IScheduler): OperatorFunction<T, T | T2>;
export function merge<T, T2>(v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2>;
export function merge<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3>;
export function merge<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3>;
export function merge<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
export function merge<T>(...observables: Array<ObservableInput<T> | IScheduler | number>): MonoTypeOperatorFunction<T>;
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift.call(mergeStatic(source, ...observables));
}

/* tslint:disable:max-line-length */
export function mergeStatic<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T>(v1: ObservableInput<T>, concurrent?: number, scheduler?: IScheduler): Observable<T>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function mergeStatic<T>(...observables: (ObservableInput<T> | IScheduler | number)[]): Observable<T>;
export function mergeStatic<T, R>(...observables: (ObservableInput<any> | IScheduler | number)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which concurrently emits all values from every
* given input Observable.
*
* <span class="informal">Flattens multiple Observables together by blending
* their values into one Observable.</span>
*
* <img src="./img/merge.png" width="100%">
*
* `merge` subscribes to each given input Observable (as arguments), and simply
* forwards (without doing any transformation) all the values from all the input
* Observables to the output Observable. The output Observable only completes
* once all input Observables have completed. Any error delivered by an input
* Observable will be immediately emitted on the output Observable.
*
* @example <caption>Merge together two Observables: 1s interval and clicks</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var timer = Rx.Observable.interval(1000);
* var clicksOrTimer = Rx.Observable.merge(clicks, timer);
* clicksOrTimer.subscribe(x => console.log(x));
*
* // Results in the following:
* // timer will emit ascending values, one every second(1000ms) to console
* // clicks logs MouseEvents to console everytime the "document" is clicked
* // Since the two streams are merged you see these happening
* // as they occur.
*
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var concurrent = 2; // the argument
* var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
* merged.subscribe(x => console.log(x));
*
* // Results in the following:
* // - First timer1 and timer2 will run concurrently
* // - timer1 will emit a value every 1000ms for 10 iterations
* // - timer2 will emit a value every 2000ms for 6 iterations
* // - after timer1 hits it's max iteration, timer2 will
* // continue, and timer3 will start to run concurrently with timer2
* // - when timer2 hits it's max iteration it terminates, and
* // timer3 will continue to emit a value every 500ms until it is complete
*
* @see {@link mergeAll}
* @see {@link mergeMap}
* @see {@link mergeMapTo}
* @see {@link mergeScan}
*
* @param {...ObservableInput} observables Input Observables to merge together.
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
* Observables being subscribed to concurrently.
* @param {Scheduler} [scheduler=null] The IScheduler to use for managing
* concurrency of input Observables.
* @return {Observable} an Observable that emits items that are the result of
* every input Observable.
* @static true
* @name merge
* @owner Observable
*/
export function mergeStatic<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> {
let concurrent = Number.POSITIVE_INFINITY;
let scheduler: IScheduler = null;
let last: any = observables[observables.length - 1];
if (isScheduler(last)) {
scheduler = <IScheduler>observables.pop();
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
concurrent = <number>observables.pop();
}
} else if (typeof last === 'number') {
concurrent = <number>observables.pop();
}

if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
return <Observable<R>>observables[0];
}

return mergeAll(concurrent)(new ArrayObservable(<any>observables, scheduler));
}