From c4125ffd2cfd2b2151a87cb79999d7398d85b3d0 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 15 Jun 2017 09:33:30 -0700 Subject: [PATCH] feat(concatMap): add higher-order lettable version of concatMap --- src/operator/concatMap.ts | 4 +- src/operator/mergeMap.ts | 102 ------------------------------------- src/operators/concatMap.ts | 72 ++++++++++++++++++++++++++ src/operators/index.ts | 2 +- 4 files changed, 75 insertions(+), 105 deletions(-) create mode 100644 src/operators/concatMap.ts diff --git a/src/operator/concatMap.ts b/src/operator/concatMap.ts index 4999e5ac43..9f41ab4bfb 100644 --- a/src/operator/concatMap.ts +++ b/src/operator/concatMap.ts @@ -1,4 +1,4 @@ -import { MergeMapOperator } from './mergeMap'; +import { concatMap as higherOrderConcatMap } from '../operators'; import { Observable, ObservableInput } from '../Observable'; /* tslint:disable:max-line-length */ @@ -67,5 +67,5 @@ export function concatMap(this: Observable, project: (value: T, inde */ export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - return this.lift(new MergeMapOperator(project, resultSelector, 1)); + return higherOrderConcatMap(project, resultSelector)(this); } diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index dbcee0783f..b4fdaec05f 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -1,10 +1,4 @@ import { Observable, ObservableInput } from '../Observable'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; import { mergeMap as higherOrderMergeMap } from '../operators'; /* tslint:disable:max-line-length */ @@ -75,99 +69,3 @@ export function mergeMap(this: Observable, project: (value: T, index concurrent: number = Number.POSITIVE_INFINITY): Observable { return higherOrderMergeMap(project, resultSelector, concurrent)(this); } - -export class MergeMapOperator implements Operator { - constructor(private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, - private concurrent: number = Number.POSITIVE_INFINITY) { - } - - call(observer: Subscriber, source: any): any { - return source.subscribe(new MergeMapSubscriber( - observer, this.project, this.resultSelector, this.concurrent - )); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class MergeMapSubscriber extends OuterSubscriber { - private hasCompleted: boolean = false; - private buffer: T[] = []; - private active: number = 0; - protected index: number = 0; - - constructor(destination: Subscriber, - private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, - private concurrent: number = Number.POSITIVE_INFINITY) { - super(destination); - } - - protected _next(value: T): void { - if (this.active < this.concurrent) { - this._tryNext(value); - } else { - this.buffer.push(value); - } - } - - protected _tryNext(value: T) { - let result: ObservableInput; - const index = this.index++; - try { - result = this.project(value, index); - } catch (err) { - this.destination.error(err); - return; - } - this.active++; - this._innerSub(result, value, index); - } - - private _innerSub(ish: ObservableInput, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); - } - - protected _complete(): void { - this.hasCompleted = true; - if (this.active === 0 && this.buffer.length === 0) { - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - if (this.resultSelector) { - this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex); - } else { - this.destination.next(innerValue); - } - } - - private _notifyResultSelector(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) { - let result: R; - try { - result = this.resultSelector(outerValue, innerValue, outerIndex, innerIndex); - } catch (err) { - this.destination.error(err); - return; - } - this.destination.next(result); - } - - notifyComplete(innerSub: Subscription): void { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if (buffer.length > 0) { - this._next(buffer.shift()); - } else if (this.active === 0 && this.hasCompleted) { - this.destination.complete(); - } - } -} diff --git a/src/operators/concatMap.ts b/src/operators/concatMap.ts new file mode 100644 index 0000000000..1ffe4f819e --- /dev/null +++ b/src/operators/concatMap.ts @@ -0,0 +1,72 @@ +import { mergeMap } from './mergeMap'; +import { ObservableInput } from '../Observable'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function concatMap(project: (value: T, index: number) => ObservableInput): OperatorFunction; +export function concatMap(project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Projects each source value to an Observable which is merged in the output + * Observable, in a serialized fashion waiting for each one to complete before + * merging the next. + * + * Maps each value to an Observable, then flattens all of + * these inner Observables using {@link concatAll}. + * + * + * + * Returns an Observable that emits items based on applying a function that you + * supply to each item emitted by the source Observable, where that function + * returns an (so-called "inner") Observable. Each new inner Observable is + * concatenated with the previous inner Observable. + * + * __Warning:__ if source values arrive endlessly and faster than their + * corresponding inner Observables can complete, it will result in memory issues + * as inner Observables amass in an unbounded buffer waiting for their turn to + * be subscribed to. + * + * Note: `concatMap` is equivalent to `mergeMap` with concurrency parameter set + * to `1`. + * + * @example For each click event, tick every second from 0 to 3, with no concurrency + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4)); + * result.subscribe(x => console.log(x)); + * + * // Results in the following: + * // (results are not concurrent) + * // For every click on the "document" it will emit values 0 to 3 spaced + * // on a 1000ms interval + * // one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 + * + * @see {@link concat} + * @see {@link concatAll} + * @see {@link concatMapTo} + * @see {@link exhaustMap} + * @see {@link mergeMap} + * @see {@link switchMap} + * + * @param {function(value: T, ?index: number): ObservableInput} project A function + * that, when applied to an item emitted by the source Observable, returns an + * Observable. + * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] + * A function to produce the value on the output Observable based on the values + * and the indices of the source (outer) emission and the inner Observable + * emission. The arguments passed to this function are: + * - `outerValue`: the value that came from the source + * - `innerValue`: the value that came from the projected Observable + * - `outerIndex`: the "index" of the value that came from the source + * - `innerIndex`: the "index" of the value from the projected Observable + * @return {Observable} An Observable that emits the result of applying the + * projection function (and the optional `resultSelector`) to each item emitted + * by the source Observable and taking values from each projected inner + * Observable sequentially. + * @method concatMap + * @owner Observable + */ +export function concatMap(project: (value: T, index: number) => ObservableInput, + resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { + return mergeMap(project, resultSelector, 1); +} diff --git a/src/operators/index.ts b/src/operators/index.ts index a1f0fd57d8..1ffa32de59 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -1,4 +1,4 @@ +export { concatMap } from './concatMap'; export { filter } from './filter'; - export { map } from './map'; export { mergeMap } from './mergeMap';